diff --git a/packages/codex_sdk/lib/codex_sdk.dart b/packages/codex_sdk/lib/codex_sdk.dart new file mode 100644 index 0000000..3a2356d --- /dev/null +++ b/packages/codex_sdk/lib/codex_sdk.dart @@ -0,0 +1,11 @@ +library codex_sdk; + +export 'src/client/codex_client.dart'; +export 'src/config/codex_config.dart'; +export 'src/config/codex_mcp_registry.dart'; +export 'src/protocol/codex_approval.dart'; +export 'src/protocol/codex_event.dart'; +export 'src/protocol/codex_event_mapper.dart'; +export 'src/protocol/codex_event_parser.dart'; +export 'src/protocol/json_rpc_message.dart'; +export 'src/transport/codex_transport.dart'; diff --git a/packages/codex_sdk/lib/src/client/codex_client.dart b/packages/codex_sdk/lib/src/client/codex_client.dart new file mode 100644 index 0000000..e549e43 --- /dev/null +++ b/packages/codex_sdk/lib/src/client/codex_client.dart @@ -0,0 +1,450 @@ +import 'dart:async'; +import 'dart:io'; + +import 'package:claude_sdk/claude_sdk.dart'; +import 'package:uuid/uuid.dart'; + +import '../config/codex_config.dart'; +import '../config/codex_mcp_registry.dart'; +import '../protocol/codex_approval.dart'; +import '../protocol/codex_event.dart'; +import '../protocol/codex_event_mapper.dart'; +import '../protocol/codex_event_parser.dart'; +import '../protocol/json_rpc_message.dart'; +import '../transport/codex_transport.dart'; + +/// Standalone client backed by Codex CLI (`codex app-server`). +/// +/// Uses a persistent subprocess communicating via JSON-RPC over stdin/stdout. +/// The transport layer handles message framing and request correlation. +/// Notifications are parsed into [CodexEvent]s, mapped to [ClaudeResponse] +/// objects, and fed through [ResponseProcessor] so the entire downstream +/// pipeline (Conversation, TUI, vide_server) works unchanged. +class CodexClient { + final CodexConfig codexConfig; + final List mcpServers; + final ResponseProcessor _responseProcessor = ResponseProcessor(); + final CodexEventParser _parser = CodexEventParser(); + final CodexEventMapper _mapper = CodexEventMapper(); + + final CodexTransport _transport = CodexTransport(); + + String? _threadId; + bool _isInitialized = false; + bool _isClosed = false; + final Completer _initializedCompleter = Completer(); + + final String _sessionId; + final String _workingDirectory; + + // Stream controllers + final _conversationController = StreamController.broadcast(); + final _turnCompleteController = StreamController.broadcast(); + final _statusController = StreamController.broadcast(); + final _initDataController = StreamController.broadcast(); + final _queuedMessageController = StreamController.broadcast(); + final _approvalRequestController = + StreamController.broadcast(); + + Conversation _currentConversation = Conversation.empty(); + ClaudeStatus _currentStatus = ClaudeStatus.ready; + MetaResponse? _initData; + + String? _queuedMessageText; + List? _queuedAttachments; + + void Function(MetaResponse response)? onMetaResponseReceived; + + CodexClient({required this.codexConfig, List? mcpServers}) + : mcpServers = mcpServers ?? [], + _sessionId = codexConfig.sessionId ?? const Uuid().v4(), + _workingDirectory = + codexConfig.workingDirectory ?? Directory.current.path { + // Auto-flush queued messages when turn completes + _turnCompleteController.stream.listen((_) { + _flushQueuedMessage(); + }); + } + + /// Initialize the client: start MCP servers, launch app-server, handshake, + /// start a thread, and wait for MCP startup to complete. + Future init() async { + if (_isInitialized) return; + + // Start MCP servers + for (final server in mcpServers) { + if (!server.isRunning) { + await server.start(); + } + } + + // Write MCP config before starting the app-server + if (mcpServers.isNotEmpty) { + await CodexMcpRegistry.writeConfig( + mcpServers: mcpServers, + workingDirectory: _workingDirectory, + ); + } + + // Start the persistent subprocess + await _transport.start(workingDirectory: _workingDirectory); + + // Subscribe to notifications → event pipeline + _transport.notifications.listen(_onNotification); + + // Subscribe to server requests → approval pipeline + _transport.serverRequests.listen(_onServerRequest); + + // Initialize handshake + final initResponse = await _transport.sendRequest('initialize', { + 'clientInfo': { + 'name': 'vide', + 'version': '0.1.0', + 'title': 'Vide', + }, + 'capabilities': {'experimentalApi': true}, + }); + + if (initResponse.isError) { + throw StateError( + 'Codex initialize failed: ${initResponse.error?.message}', + ); + } + + // Send initialized notification + _transport.sendNotification('initialized'); + + // Start a thread + final threadResponse = await _transport.sendRequest( + 'thread/start', + codexConfig.toThreadStartParams(), + ); + + if (threadResponse.isError) { + throw StateError( + 'Codex thread/start failed: ${threadResponse.error?.message}', + ); + } + + final threadResult = threadResponse.result ?? {}; + final thread = threadResult['thread'] as Map? ?? {}; + _threadId = thread['id'] as String?; + + // Wait for mcp_startup_complete notification + if (mcpServers.isNotEmpty) { + await _transport.notifications + .where((n) => n.method == 'codex/event/mcp_startup_complete') + .first + .timeout( + const Duration(seconds: 30), + onTimeout: () => throw TimeoutException( + 'Timed out waiting for MCP startup to complete', + ), + ); + } + + _isInitialized = true; + if (!_initializedCompleter.isCompleted) { + _initializedCompleter.complete(); + } + } + + // ============================================================ + // Public API + // ============================================================ + + String get sessionId => _sessionId; + + String get workingDirectory => _workingDirectory; + + String? get threadId => _threadId; + + Stream get conversation => _conversationController.stream; + + Conversation get currentConversation => _currentConversation; + + Future get initialized => _initializedCompleter.future; + + Stream get initDataStream => _initDataController.stream; + + MetaResponse? get initData => _initData; + + Stream get onTurnComplete => _turnCompleteController.stream; + + Stream get statusStream => _statusController.stream; + + ClaudeStatus get currentStatus => _currentStatus; + + Stream get queuedMessage => _queuedMessageController.stream; + + String? get currentQueuedMessage => _queuedMessageText; + + /// Approval requests from the server. + Stream get approvalRequests => + _approvalRequestController.stream; + + void clearQueuedMessage() { + _queuedMessageText = null; + _queuedAttachments = null; + _queuedMessageController.add(null); + } + + void sendMessage(Message message) { + if (message.text.trim().isEmpty && + (message.attachments?.isEmpty ?? true)) { + return; + } + + // If currently processing, queue the message + if (_currentConversation.isProcessing) { + _queueMessage(message); + return; + } + + // Add user message to conversation optimistically + final userMessage = ConversationMessage.user( + content: message.text, + attachments: message.attachments, + ); + _updateConversation( + _currentConversation + .addMessage(userMessage) + .withState(ConversationState.sendingMessage), + ); + + _updateStatus(ClaudeStatus.processing); + + _startTurn(message.text); + } + + /// Respond to a server approval request. + void respondToApproval( + dynamic requestId, + CodexApprovalDecision decision, + ) { + _transport.respondToRequest(requestId, {'decision': decision.toJson()}); + } + + void injectToolResult(ToolResultResponse toolResult) { + // Codex manages its own tool execution; this is a no-op. + // We still update the conversation for UI consistency. + if (_currentConversation.messages.isEmpty) return; + + final lastIndex = _currentConversation.messages.length - 1; + final lastMessage = _currentConversation.messages[lastIndex]; + if (lastMessage.role != MessageRole.assistant) return; + + final updatedMessage = lastMessage.copyWith( + responses: [...lastMessage.responses, toolResult], + ); + final updatedMessages = [..._currentConversation.messages]; + updatedMessages[lastIndex] = updatedMessage; + _updateConversation( + _currentConversation.copyWith(messages: updatedMessages), + ); + } + + Future abort() async { + if (!_transport.isRunning || _threadId == null) return; + await _transport.sendRequest('turn/interrupt', { + 'threadId': _threadId, + }); + _updateStatus(ClaudeStatus.ready); + } + + Future close() async { + _isClosed = true; + + await _transport.close(); + + for (final server in mcpServers) { + await server.stop(); + } + + await CodexMcpRegistry.cleanUp(workingDirectory: _workingDirectory); + + await _conversationController.close(); + await _turnCompleteController.close(); + await _statusController.close(); + await _initDataController.close(); + await _queuedMessageController.close(); + await _approvalRequestController.close(); + + _isInitialized = false; + } + + Future clearConversation() async { + _updateConversation(Conversation.empty()); + + // Start a new thread on the same persistent server + final threadResponse = await _transport.sendRequest( + 'thread/start', + codexConfig.toThreadStartParams(), + ); + + if (!threadResponse.isError) { + final threadResult = threadResponse.result ?? {}; + final thread = threadResult['thread'] as Map? ?? {}; + _threadId = thread['id'] as String?; + } + } + + T? getMcpServer(String name) { + try { + return mcpServers.whereType().firstWhere((s) => s.name == name); + } catch (_) { + return null; + } + } + + // ============================================================ + // Private implementation + // ============================================================ + + Future _startTurn(String prompt) async { + final turnResponse = await _transport.sendRequest('turn/start', { + 'threadId': _threadId, + 'input': [ + {'type': 'text', 'text': prompt}, + ], + }); + + if (turnResponse.isError) { + _handleError( + 'turn/start failed: ${turnResponse.error?.message ?? 'unknown'}', + ); + } + } + + void _onNotification(JsonRpcNotification notification) { + if (_isClosed) return; + + final event = _parser.parseNotification(notification); + _handleEvent(event); + } + + void _onServerRequest(JsonRpcRequest request) { + if (_isClosed) return; + + switch (request.method) { + case 'item/commandExecution/requestApproval': + final approval = CodexApprovalRequest.commandExecution( + requestId: request.id, + params: request.params, + ); + _approvalRequestController.add(approval); + case 'item/fileChange/requestApproval': + final approval = CodexApprovalRequest.fileChange( + requestId: request.id, + params: request.params, + ); + _approvalRequestController.add(approval); + case 'item/tool/requestUserInput': + final approval = CodexApprovalRequest.userInput( + requestId: request.id, + params: request.params, + ); + _approvalRequestController.add(approval); + } + } + + void _handleEvent(CodexEvent event) { + // Capture thread ID from thread/started notification + if (event is ThreadStartedEvent) { + _threadId = event.threadId; + } + + // Map to ClaudeResponse objects + final responses = _mapper.mapEvent(event); + + var conversation = _currentConversation; + var turnComplete = false; + + for (final response in responses) { + // Handle status updates + if (response is StatusResponse) { + _updateStatus(response.status); + } + + // Handle init data (MetaResponse) + if (response is MetaResponse) { + _initData = response; + if (!_isClosed) _initDataController.add(response); + onMetaResponseReceived?.call(response); + } + + // Process through ResponseProcessor + final result = + _responseProcessor.processResponse(response, conversation); + conversation = result.updatedConversation; + turnComplete = turnComplete || result.turnComplete; + } + + _updateConversation(conversation); + + if (turnComplete) { + _updateStatus(ClaudeStatus.ready); + if (!_isClosed) _turnCompleteController.add(null); + } + } + + void _handleError(String error) { + if (_isClosed) return; + final errorResponse = ErrorResponse( + id: 'codex_error_${DateTime.now().millisecondsSinceEpoch}', + timestamp: DateTime.now(), + error: error, + ); + + final result = _responseProcessor.processResponse( + errorResponse, + _currentConversation, + ); + _updateConversation(result.updatedConversation); + _updateStatus(ClaudeStatus.ready); + if (!_isClosed) _turnCompleteController.add(null); + } + + void _updateConversation(Conversation newConversation) { + if (_isClosed) return; + _currentConversation = newConversation; + _conversationController.add(_currentConversation); + } + + void _updateStatus(ClaudeStatus status) { + if (_isClosed) return; + if (_currentStatus != status) { + _currentStatus = status; + _statusController.add(status); + } + } + + void _queueMessage(Message message) { + if (_queuedMessageText == null) { + _queuedMessageText = message.text; + _queuedAttachments = message.attachments; + } else { + _queuedMessageText = '$_queuedMessageText\n${message.text}'; + if (message.attachments != null) { + _queuedAttachments = [ + ...(_queuedAttachments ?? []), + ...message.attachments!, + ]; + } + } + _queuedMessageController.add(_queuedMessageText); + } + + void _flushQueuedMessage() { + if (_queuedMessageText == null) return; + + final text = _queuedMessageText!; + final attachments = _queuedAttachments; + + _queuedMessageText = null; + _queuedAttachments = null; + _queuedMessageController.add(null); + + sendMessage(Message(text: text, attachments: attachments)); + } +} diff --git a/packages/codex_sdk/lib/src/config/codex_config.dart b/packages/codex_sdk/lib/src/config/codex_config.dart new file mode 100644 index 0000000..9b9118d --- /dev/null +++ b/packages/codex_sdk/lib/src/config/codex_config.dart @@ -0,0 +1,69 @@ +class CodexConfig { + final String? model; + final String? profile; + final String sandboxMode; + final String? workingDirectory; + final String? sessionId; + final String? appendSystemPrompt; + final bool skipGitRepoCheck; + final List? additionalDirs; + final String approvalPolicy; + + const CodexConfig({ + this.model, + this.profile, + this.sandboxMode = 'workspace-write', + this.workingDirectory, + this.sessionId, + this.appendSystemPrompt, + this.skipGitRepoCheck = false, + this.additionalDirs, + this.approvalPolicy = 'on-failure', + }); + + /// Build the params map for the `thread/start` JSON-RPC request. + Map toThreadStartParams() { + final params = {}; + + if (workingDirectory != null) { + params['cwd'] = workingDirectory; + } + + params['sandbox'] = sandboxMode; + params['approvalPolicy'] = approvalPolicy; + + if (model != null) { + params['model'] = model; + } + + if (appendSystemPrompt != null) { + params['developerInstructions'] = appendSystemPrompt; + } + + return params; + } + + CodexConfig copyWith({ + String? model, + String? profile, + String? sandboxMode, + String? workingDirectory, + String? sessionId, + String? appendSystemPrompt, + bool? skipGitRepoCheck, + List? additionalDirs, + String? approvalPolicy, + }) { + return CodexConfig( + model: model ?? this.model, + profile: profile ?? this.profile, + sandboxMode: sandboxMode ?? this.sandboxMode, + workingDirectory: workingDirectory ?? this.workingDirectory, + sessionId: sessionId ?? this.sessionId, + appendSystemPrompt: appendSystemPrompt ?? this.appendSystemPrompt, + skipGitRepoCheck: skipGitRepoCheck ?? this.skipGitRepoCheck, + additionalDirs: additionalDirs ?? this.additionalDirs, + approvalPolicy: approvalPolicy ?? this.approvalPolicy, + ); + } +} diff --git a/packages/codex_sdk/lib/src/config/codex_mcp_registry.dart b/packages/codex_sdk/lib/src/config/codex_mcp_registry.dart new file mode 100644 index 0000000..de8fa99 --- /dev/null +++ b/packages/codex_sdk/lib/src/config/codex_mcp_registry.dart @@ -0,0 +1,60 @@ +import 'dart:io'; + +import 'package:claude_sdk/claude_sdk.dart'; +import 'package:path/path.dart' as p; + +/// Writes a temporary `.codex/config.toml` in the working directory +/// so that `codex exec` discovers our MCP servers. +/// +/// Codex CLI reads MCP server configuration from its config file. +/// Each server is registered as a `[mcp_servers.]` section +/// with the `url` pointing to our running [McpServerBase] instances. +class CodexMcpRegistry { + /// Write `.codex/config.toml` with MCP server entries. + /// + /// If the file already exists, it is overwritten. The file is written + /// to `/.codex/config.toml`. + static Future writeConfig({ + required List mcpServers, + required String workingDirectory, + }) async { + if (mcpServers.isEmpty) return; + + final configDir = Directory(p.join(workingDirectory, '.codex')); + if (!configDir.existsSync()) { + configDir.createSync(recursive: true); + } + + final buffer = StringBuffer(); + buffer.writeln('# Auto-generated by codex_sdk — do not edit manually'); + buffer.writeln(); + + for (final server in mcpServers) { + if (!server.isRunning) continue; + + final config = server.toClaudeConfig(); + final url = config['url'] as String; + + buffer.writeln('[mcp_servers.${_sanitizeName(server.name)}]'); + buffer.writeln('url = "$url"'); + buffer.writeln(); + } + + final configFile = File(p.join(configDir.path, 'config.toml')); + await configFile.writeAsString(buffer.toString()); + } + + /// Remove the generated config file. + static Future cleanUp({required String workingDirectory}) async { + final configFile = File(p.join(workingDirectory, '.codex', 'config.toml')); + if (configFile.existsSync()) { + await configFile.delete(); + } + } + + /// Sanitize server name for use as a TOML key. + /// Replaces non-alphanumeric characters with underscores. + static String _sanitizeName(String name) { + return name.replaceAll(RegExp(r'[^a-zA-Z0-9_]'), '_'); + } +} diff --git a/packages/codex_sdk/lib/src/protocol/codex_approval.dart b/packages/codex_sdk/lib/src/protocol/codex_approval.dart new file mode 100644 index 0000000..c469080 --- /dev/null +++ b/packages/codex_sdk/lib/src/protocol/codex_approval.dart @@ -0,0 +1,124 @@ +/// Approval types for the Codex app-server. +/// +/// When the approval policy allows it, the server sends JSON-RPC requests +/// to the client asking for permission before executing commands or +/// modifying files. The client must respond with a decision. + +/// An approval request from the Codex server. +class CodexApprovalRequest { + /// The JSON-RPC request ID. Must be echoed back in the response. + final dynamic requestId; + + /// What type of approval is being requested. + final CodexApprovalType type; + + /// Thread, turn, and item context. + final String threadId; + final String turnId; + final String itemId; + + /// The command to be executed (for command approvals). + final String? command; + + /// The command's working directory. + final String? cwd; + + /// Reason for the approval request. + final String? reason; + + /// Proposed exec-policy amendment to auto-approve similar commands. + final List? proposedExecpolicyAmendment; + + /// Grant root path for file change approvals. + final String? grantRoot; + + /// Questions for user input requests. + final List>? questions; + + const CodexApprovalRequest({ + required this.requestId, + required this.type, + required this.threadId, + required this.turnId, + required this.itemId, + this.command, + this.cwd, + this.reason, + this.proposedExecpolicyAmendment, + this.grantRoot, + this.questions, + }); + + factory CodexApprovalRequest.commandExecution({ + required dynamic requestId, + required Map params, + }) { + return CodexApprovalRequest( + requestId: requestId, + type: CodexApprovalType.commandExecution, + threadId: params['threadId'] as String? ?? '', + turnId: params['turnId'] as String? ?? '', + itemId: params['itemId'] as String? ?? '', + command: params['command'] as String?, + cwd: params['cwd'] as String?, + reason: params['reason'] as String?, + proposedExecpolicyAmendment: + (params['proposedExecpolicyAmendment'] as List?) + ?.cast(), + ); + } + + factory CodexApprovalRequest.fileChange({ + required dynamic requestId, + required Map params, + }) { + return CodexApprovalRequest( + requestId: requestId, + type: CodexApprovalType.fileChange, + threadId: params['threadId'] as String? ?? '', + turnId: params['turnId'] as String? ?? '', + itemId: params['itemId'] as String? ?? '', + reason: params['reason'] as String?, + grantRoot: params['grantRoot'] as String?, + ); + } + + factory CodexApprovalRequest.userInput({ + required dynamic requestId, + required Map params, + }) { + return CodexApprovalRequest( + requestId: requestId, + type: CodexApprovalType.userInput, + threadId: params['threadId'] as String? ?? '', + turnId: params['turnId'] as String? ?? '', + itemId: params['itemId'] as String? ?? '', + questions: (params['questions'] as List?) + ?.cast>(), + ); + } +} + +/// The type of approval being requested. +enum CodexApprovalType { + commandExecution, + fileChange, + userInput, +} + +/// Decision for a command or file change approval request. +enum CodexApprovalDecision { + /// Approve this specific request. + accept, + + /// Approve and auto-approve identical requests for the session. + acceptForSession, + + /// Deny the request. The agent continues the turn. + decline, + + /// Deny the request and interrupt the turn. + cancel; + + String toJson() => name; +} diff --git a/packages/codex_sdk/lib/src/protocol/codex_event.dart b/packages/codex_sdk/lib/src/protocol/codex_event.dart new file mode 100644 index 0000000..53e0edd --- /dev/null +++ b/packages/codex_sdk/lib/src/protocol/codex_event.dart @@ -0,0 +1,401 @@ +import 'json_rpc_message.dart'; + +/// Codex app-server event types. +/// +/// These represent the JSON-RPC notifications sent by `codex app-server`. +/// Each event corresponds to a notification method (e.g. `item/started`, +/// `item/agentMessage/delta`). +sealed class CodexEvent { + const CodexEvent(); + + /// Parse a [JsonRpcNotification] into a typed [CodexEvent]. + factory CodexEvent.fromNotification(JsonRpcNotification notification) { + final method = notification.method; + final params = notification.params; + + return switch (method) { + // Thread lifecycle + 'thread/started' => ThreadStartedEvent.fromParams(params), + 'thread/name/updated' => ThreadNameUpdatedEvent.fromParams(params), + 'thread/tokenUsage/updated' => + TokenUsageUpdatedEvent.fromParams(params), + 'thread/compacted' => ThreadCompactedEvent.fromParams(params), + + // Turn lifecycle + 'turn/started' => TurnStartedEvent.fromParams(params), + 'turn/completed' => TurnCompletedEvent.fromParams(params), + + // Item lifecycle + 'item/started' => ItemStartedEvent.fromParams(params), + 'item/completed' => ItemCompletedEvent.fromParams(params), + + // Streaming deltas + 'item/agentMessage/delta' => AgentMessageDeltaEvent.fromParams(params), + 'item/reasoning/summaryTextDelta' => + ReasoningSummaryDeltaEvent.fromParams(params), + 'item/reasoning/textDelta' => + ReasoningTextDeltaEvent.fromParams(params), + 'item/commandExecution/outputDelta' => + CommandOutputDeltaEvent.fromParams(params), + 'item/fileChange/outputDelta' => + FileChangeOutputDeltaEvent.fromParams(params), + 'item/mcpToolCall/progress' => + McpToolCallProgressEvent.fromParams(params), + + // Legacy codex/event namespace (still emitted alongside new events) + 'codex/event/task_complete' => TaskCompleteEvent.fromParams(params), + 'codex/event/mcp_startup_complete' => const McpStartupCompleteEvent(), + + // Errors + 'error' => CodexErrorEvent.fromParams(params), + + // Everything else + _ => UnknownCodexEvent(method: method, params: params), + }; + } +} + +// --------------------------------------------------------------------------- +// Thread lifecycle +// --------------------------------------------------------------------------- + +class ThreadStartedEvent extends CodexEvent { + final String threadId; + final Map threadData; + + const ThreadStartedEvent({ + required this.threadId, + required this.threadData, + }); + + factory ThreadStartedEvent.fromParams(Map params) { + final thread = params['thread'] as Map? ?? {}; + return ThreadStartedEvent( + threadId: thread['id'] as String? ?? '', + threadData: thread, + ); + } +} + +class ThreadNameUpdatedEvent extends CodexEvent { + final String threadId; + final String name; + + const ThreadNameUpdatedEvent({ + required this.threadId, + required this.name, + }); + + factory ThreadNameUpdatedEvent.fromParams(Map params) { + return ThreadNameUpdatedEvent( + threadId: params['threadId'] as String? ?? '', + name: params['name'] as String? ?? '', + ); + } +} + +class ThreadCompactedEvent extends CodexEvent { + final Map params; + + const ThreadCompactedEvent({required this.params}); + + factory ThreadCompactedEvent.fromParams(Map params) { + return ThreadCompactedEvent(params: params); + } +} + +// --------------------------------------------------------------------------- +// Turn lifecycle +// --------------------------------------------------------------------------- + +class TurnStartedEvent extends CodexEvent { + final String turnId; + final Map turnData; + + const TurnStartedEvent({ + required this.turnId, + required this.turnData, + }); + + factory TurnStartedEvent.fromParams(Map params) { + final turn = params['turn'] as Map? ?? {}; + return TurnStartedEvent( + turnId: turn['id'] as String? ?? '', + turnData: turn, + ); + } +} + +class TurnCompletedEvent extends CodexEvent { + final String turnId; + final String status; + final Map turnData; + + const TurnCompletedEvent({ + required this.turnId, + required this.status, + required this.turnData, + }); + + factory TurnCompletedEvent.fromParams(Map params) { + final turn = params['turn'] as Map? ?? {}; + return TurnCompletedEvent( + turnId: turn['id'] as String? ?? '', + status: turn['status'] as String? ?? '', + turnData: turn, + ); + } +} + +// --------------------------------------------------------------------------- +// Item lifecycle +// --------------------------------------------------------------------------- + +class ItemStartedEvent extends CodexEvent { + final String itemId; + final String itemType; + final Map itemData; + + const ItemStartedEvent({ + required this.itemId, + required this.itemType, + required this.itemData, + }); + + factory ItemStartedEvent.fromParams(Map params) { + final item = params['item'] as Map? ?? {}; + return ItemStartedEvent( + itemId: item['id'] as String? ?? '', + itemType: item['type'] as String? ?? '', + itemData: item, + ); + } +} + +class ItemCompletedEvent extends CodexEvent { + final String itemId; + final String itemType; + final Map itemData; + + const ItemCompletedEvent({ + required this.itemId, + required this.itemType, + required this.itemData, + }); + + factory ItemCompletedEvent.fromParams(Map params) { + final item = params['item'] as Map? ?? {}; + return ItemCompletedEvent( + itemId: item['id'] as String? ?? '', + itemType: item['type'] as String? ?? '', + itemData: item, + ); + } +} + +// --------------------------------------------------------------------------- +// Streaming deltas +// --------------------------------------------------------------------------- + +class AgentMessageDeltaEvent extends CodexEvent { + final String itemId; + final String delta; + + const AgentMessageDeltaEvent({ + required this.itemId, + required this.delta, + }); + + factory AgentMessageDeltaEvent.fromParams(Map params) { + return AgentMessageDeltaEvent( + itemId: params['itemId'] as String? ?? '', + delta: params['delta'] as String? ?? '', + ); + } +} + +class ReasoningSummaryDeltaEvent extends CodexEvent { + final String itemId; + final String delta; + + const ReasoningSummaryDeltaEvent({ + required this.itemId, + required this.delta, + }); + + factory ReasoningSummaryDeltaEvent.fromParams(Map params) { + return ReasoningSummaryDeltaEvent( + itemId: params['itemId'] as String? ?? '', + delta: params['delta'] as String? ?? '', + ); + } +} + +class ReasoningTextDeltaEvent extends CodexEvent { + final String itemId; + final String delta; + + const ReasoningTextDeltaEvent({ + required this.itemId, + required this.delta, + }); + + factory ReasoningTextDeltaEvent.fromParams(Map params) { + return ReasoningTextDeltaEvent( + itemId: params['itemId'] as String? ?? '', + delta: params['delta'] as String? ?? '', + ); + } +} + +class CommandOutputDeltaEvent extends CodexEvent { + final String itemId; + final String delta; + + const CommandOutputDeltaEvent({ + required this.itemId, + required this.delta, + }); + + factory CommandOutputDeltaEvent.fromParams(Map params) { + return CommandOutputDeltaEvent( + itemId: params['itemId'] as String? ?? '', + delta: params['delta'] as String? ?? '', + ); + } +} + +class FileChangeOutputDeltaEvent extends CodexEvent { + final String itemId; + final String delta; + + const FileChangeOutputDeltaEvent({ + required this.itemId, + required this.delta, + }); + + factory FileChangeOutputDeltaEvent.fromParams(Map params) { + return FileChangeOutputDeltaEvent( + itemId: params['itemId'] as String? ?? '', + delta: params['delta'] as String? ?? '', + ); + } +} + +class McpToolCallProgressEvent extends CodexEvent { + final String itemId; + final Map params; + + const McpToolCallProgressEvent({ + required this.itemId, + required this.params, + }); + + factory McpToolCallProgressEvent.fromParams(Map params) { + return McpToolCallProgressEvent( + itemId: params['itemId'] as String? ?? '', + params: params, + ); + } +} + +// --------------------------------------------------------------------------- +// Token usage +// --------------------------------------------------------------------------- + +class TokenUsageUpdatedEvent extends CodexEvent { + final CodexUsage usage; + + const TokenUsageUpdatedEvent({required this.usage}); + + factory TokenUsageUpdatedEvent.fromParams(Map params) { + return TokenUsageUpdatedEvent( + usage: CodexUsage.fromJson(params), + ); + } +} + +// --------------------------------------------------------------------------- +// Legacy codex/event namespace +// --------------------------------------------------------------------------- + +class TaskCompleteEvent extends CodexEvent { + final String? lastAgentMessage; + final Map params; + + const TaskCompleteEvent({ + this.lastAgentMessage, + required this.params, + }); + + factory TaskCompleteEvent.fromParams(Map params) { + final msg = params['msg'] as Map? ?? {}; + return TaskCompleteEvent( + lastAgentMessage: msg['last_agent_message'] as String?, + params: params, + ); + } +} + +class McpStartupCompleteEvent extends CodexEvent { + const McpStartupCompleteEvent(); +} + +// --------------------------------------------------------------------------- +// Errors +// --------------------------------------------------------------------------- + +class CodexErrorEvent extends CodexEvent { + final String message; + final Map? details; + + const CodexErrorEvent({required this.message, this.details}); + + factory CodexErrorEvent.fromParams(Map params) { + final message = params['message'] as String? ?? 'Unknown error'; + return CodexErrorEvent(message: message, details: params); + } +} + +// --------------------------------------------------------------------------- +// Unknown / catch-all +// --------------------------------------------------------------------------- + +class UnknownCodexEvent extends CodexEvent { + final String method; + final Map params; + + const UnknownCodexEvent({required this.method, required this.params}); +} + +// --------------------------------------------------------------------------- +// Shared types +// --------------------------------------------------------------------------- + +class CodexUsage { + final int inputTokens; + final int cachedInputTokens; + final int outputTokens; + + const CodexUsage({ + required this.inputTokens, + required this.cachedInputTokens, + required this.outputTokens, + }); + + factory CodexUsage.fromJson(Map json) { + // Support both the thread/tokenUsage/updated format and legacy format + final usage = json['usage'] as Map? ?? json; + return CodexUsage( + inputTokens: usage['input_tokens'] as int? ?? + usage['inputTokens'] as int? ?? + 0, + cachedInputTokens: usage['cached_input_tokens'] as int? ?? + usage['cachedInputTokens'] as int? ?? + 0, + outputTokens: usage['output_tokens'] as int? ?? + usage['outputTokens'] as int? ?? + 0, + ); + } +} diff --git a/packages/codex_sdk/lib/src/protocol/codex_event_mapper.dart b/packages/codex_sdk/lib/src/protocol/codex_event_mapper.dart new file mode 100644 index 0000000..0dff382 --- /dev/null +++ b/packages/codex_sdk/lib/src/protocol/codex_event_mapper.dart @@ -0,0 +1,432 @@ +import 'package:claude_sdk/claude_sdk.dart'; + +import 'codex_event.dart'; + +/// Maps [CodexEvent]s to [ClaudeResponse] objects. +/// +/// This allows the entire downstream pipeline (ResponseProcessor, +/// Conversation, TUI, vide_server) to work unchanged — they all +/// consume ClaudeResponse types regardless of which backend produced them. +class CodexEventMapper { + int _idCounter = 0; + + String _nextId() => 'codex_${_idCounter++}'; + + /// Map a single Codex event to zero or more ClaudeResponse objects. + List mapEvent(CodexEvent event) { + return switch (event) { + // Thread lifecycle + ThreadStartedEvent e => _mapThreadStarted(e), + ThreadNameUpdatedEvent _ => [], + ThreadCompactedEvent _ => [], + + // Turn lifecycle + TurnStartedEvent _ => _mapTurnStarted(), + TurnCompletedEvent _ => _mapTurnCompleted(), + + // Item lifecycle + ItemStartedEvent e => _mapItemStarted(e), + ItemCompletedEvent e => _mapItemCompleted(e), + + // Streaming deltas + AgentMessageDeltaEvent e => _mapAgentMessageDelta(e), + ReasoningSummaryDeltaEvent _ => [], + ReasoningTextDeltaEvent _ => [], + CommandOutputDeltaEvent _ => [], + FileChangeOutputDeltaEvent _ => [], + McpToolCallProgressEvent _ => [], + + // Token usage + TokenUsageUpdatedEvent e => _mapTokenUsage(e), + + // Legacy events + TaskCompleteEvent e => _mapTaskComplete(e), + McpStartupCompleteEvent _ => [], + + // Errors + CodexErrorEvent e => _mapError(e), + + // Unknown + UnknownCodexEvent _ => [], + }; + } + + // -------------------------------------------------------------------------- + // Thread lifecycle + // -------------------------------------------------------------------------- + + List _mapThreadStarted(ThreadStartedEvent event) { + return [ + MetaResponse( + id: event.threadId, + timestamp: DateTime.now(), + metadata: {'session_id': event.threadId}, + ), + ]; + } + + // -------------------------------------------------------------------------- + // Turn lifecycle + // -------------------------------------------------------------------------- + + List _mapTurnStarted() { + return [ + StatusResponse( + id: _nextId(), + timestamp: DateTime.now(), + status: ClaudeStatus.processing, + ), + ]; + } + + List _mapTurnCompleted() { + return [ + CompletionResponse( + id: _nextId(), + timestamp: DateTime.now(), + stopReason: 'completed', + ), + ]; + } + + // -------------------------------------------------------------------------- + // Item lifecycle + // -------------------------------------------------------------------------- + + List _mapItemStarted(ItemStartedEvent event) { + return switch (event.itemType) { + 'agentMessage' => [], + 'commandExecution' => _mapCommandExecutionStarted(event), + 'fileChange' => _mapFileChangeStarted(event), + 'mcpToolCall' => _mapMcpToolCallStarted(event), + 'reasoning' => [], + 'webSearch' => _mapWebSearchStarted(event), + 'todoList' => [], + _ => [], + }; + } + + List _mapItemCompleted(ItemCompletedEvent event) { + return switch (event.itemType) { + 'agentMessage' => _mapAgentMessageCompleted(event), + 'commandExecution' => _mapCommandExecutionCompleted(event), + 'fileChange' => _mapFileChangeCompleted(event), + 'mcpToolCall' => _mapMcpToolCallCompleted(event), + 'reasoning' => _mapReasoningCompleted(event), + 'webSearch' => _mapWebSearchCompleted(event), + 'todoList' => _mapTodoListCompleted(event), + _ => [], + }; + } + + // -------------------------------------------------------------------------- + // Streaming deltas + // -------------------------------------------------------------------------- + + List _mapAgentMessageDelta(AgentMessageDeltaEvent event) { + return [ + TextResponse( + id: event.itemId, + timestamp: DateTime.now(), + content: event.delta, + isCumulative: false, + ), + ]; + } + + // -------------------------------------------------------------------------- + // Token usage + // -------------------------------------------------------------------------- + + List _mapTokenUsage(TokenUsageUpdatedEvent event) { + return [ + CompletionResponse( + id: _nextId(), + timestamp: DateTime.now(), + stopReason: 'usage_update', + inputTokens: event.usage.inputTokens, + outputTokens: event.usage.outputTokens, + cacheReadInputTokens: event.usage.cachedInputTokens, + ), + ]; + } + + // -------------------------------------------------------------------------- + // Legacy events + // -------------------------------------------------------------------------- + + List _mapTaskComplete(TaskCompleteEvent event) { + return [ + CompletionResponse( + id: _nextId(), + timestamp: DateTime.now(), + stopReason: 'completed', + ), + ]; + } + + // -------------------------------------------------------------------------- + // Errors + // -------------------------------------------------------------------------- + + List _mapError(CodexErrorEvent event) { + return [ + ErrorResponse( + id: _nextId(), + timestamp: DateTime.now(), + error: event.message, + details: event.details?.toString(), + ), + ]; + } + + // -------------------------------------------------------------------------- + // Item type handlers + // -------------------------------------------------------------------------- + + List _mapAgentMessageCompleted(ItemCompletedEvent event) { + final text = _extractStringField(event.itemData, 'text') ?? ''; + if (text.isEmpty) return []; + return [ + TextResponse( + id: event.itemId, + timestamp: DateTime.now(), + content: text, + isCumulative: true, + ), + ]; + } + + List _mapCommandExecutionStarted(ItemStartedEvent event) { + return [ + ToolUseResponse( + id: event.itemId, + timestamp: DateTime.now(), + toolName: 'Bash', + parameters: {'command': event.itemData['command'] as String? ?? ''}, + toolUseId: event.itemId, + ), + ]; + } + + List _mapCommandExecutionCompleted( + ItemCompletedEvent event, + ) { + final exitCode = event.itemData['exit_code'] as int?; + final output = + event.itemData['aggregated_output'] as String? ?? + event.itemData['output'] as String? ?? + ''; + return [ + ToolResultResponse( + id: '${event.itemId}_result', + timestamp: DateTime.now(), + toolUseId: event.itemId, + content: output, + isError: exitCode != null && exitCode != 0, + ), + ]; + } + + List _mapFileChangeStarted(ItemStartedEvent event) { + final changes = event.itemData['changes'] as List?; + final params = {}; + if (changes != null && changes.isNotEmpty) { + final paths = + changes.map((c) => (c as Map)['path'] as String? ?? '').toList(); + params['files'] = paths; + final kind = (changes.first as Map)['kind'] as String? ?? 'update'; + params['kind'] = kind; + } + final toolName = _inferFileToolName(changes); + return [ + ToolUseResponse( + id: event.itemId, + timestamp: DateTime.now(), + toolName: toolName, + parameters: params, + toolUseId: event.itemId, + ), + ]; + } + + List _mapFileChangeCompleted(ItemCompletedEvent event) { + final changes = event.itemData['changes'] as List?; + final summary = changes != null + ? changes + .map((c) { + final path = (c as Map)['path'] ?? ''; + final kind = c['kind'] ?? ''; + return '$kind: $path'; + }) + .join('\n') + : 'Done'; + return [ + ToolResultResponse( + id: '${event.itemId}_result', + timestamp: DateTime.now(), + toolUseId: event.itemId, + content: summary, + isError: false, + ), + ]; + } + + List _mapMcpToolCallStarted(ItemStartedEvent event) { + final serverLabel = event.itemData['server'] as String? ?? ''; + final toolName = event.itemData['tool'] as String? ?? ''; + final fullName = + serverLabel.isNotEmpty ? 'mcp__${serverLabel}__$toolName' : toolName; + final arguments = event.itemData['arguments']; + return [ + ToolUseResponse( + id: event.itemId, + timestamp: DateTime.now(), + toolName: fullName, + parameters: + arguments is Map + ? arguments + : {}, + toolUseId: event.itemId, + ), + ]; + } + + List _mapMcpToolCallCompleted(ItemCompletedEvent event) { + final error = event.itemData['error']; + final isError = error != null; + final content = isError + ? _extractErrorMessage(error) + : _extractMcpResult(event.itemData['result']); + return [ + ToolResultResponse( + id: '${event.itemId}_result', + timestamp: DateTime.now(), + toolUseId: event.itemId, + content: content, + isError: isError, + ), + ]; + } + + List _mapReasoningCompleted(ItemCompletedEvent event) { + final text = _extractStringField(event.itemData, 'text') ?? + _extractStringField(event.itemData, 'summary') ?? + ''; + if (text.isEmpty) return []; + return [ + TextResponse( + id: event.itemId, + timestamp: DateTime.now(), + content: text, + isCumulative: true, + ), + ]; + } + + List _mapWebSearchStarted(ItemStartedEvent event) { + final query = event.itemData['query'] as String? ?? ''; + return [ + ToolUseResponse( + id: event.itemId, + timestamp: DateTime.now(), + toolName: 'WebSearch', + parameters: {'query': query}, + toolUseId: event.itemId, + ), + ]; + } + + List _mapWebSearchCompleted(ItemCompletedEvent event) { + return [ + ToolResultResponse( + id: '${event.itemId}_result', + timestamp: DateTime.now(), + toolUseId: event.itemId, + content: 'Search complete', + isError: false, + ), + ]; + } + + List _mapTodoListCompleted(ItemCompletedEvent event) { + final items = event.itemData['items'] as List? ?? []; + if (items.isEmpty) return []; + final text = items + .map((item) { + final map = item as Map; + final completed = map['completed'] as bool? ?? false; + final label = map['text'] as String? ?? ''; + return '${completed ? '[x]' : '[ ]'} $label'; + }) + .join('\n'); + return [ + TextResponse( + id: event.itemId, + timestamp: DateTime.now(), + content: text, + isCumulative: true, + ), + ]; + } + + // -------------------------------------------------------------------------- + // Helpers + // -------------------------------------------------------------------------- + + String _inferFileToolName(List? changes) { + if (changes == null || changes.isEmpty) return 'Write'; + final kind = (changes.first as Map)['kind'] as String? ?? ''; + return switch (kind) { + 'add' => 'Write', + 'delete' => 'Write', + 'update' => 'Edit', + _ => 'Write', + }; + } + + String _extractMcpResult(dynamic result) { + if (result == null) return ''; + if (result is String) return result; + if (result is List) { + return result + .map((block) { + if (block is Map && block['type'] == 'text') { + return block['text'] as String? ?? ''; + } + if (block is Map) return block.toString(); + return block.toString(); + }) + .join('\n'); + } + if (result is Map) return result.toString(); + return result.toString(); + } + + String _extractErrorMessage(dynamic error) { + if (error is String) return error; + if (error is Map) return error['message'] as String? ?? error.toString(); + return error.toString(); + } + + /// Extracts a string from a field that may be a String or a List of content + /// blocks (e.g., `[{"type": "summary_text", "text": "..."}]`). + String? _extractStringField(Map data, String key) { + final value = data[key]; + if (value is String) return value; + if (value is List) { + final parts = value + .map((block) { + if (block is Map && block.containsKey('text')) { + return block['text'] as String? ?? ''; + } + if (block is String) return block; + return ''; + }) + .where((s) => s.isNotEmpty) + .toList(); + return parts.isEmpty ? null : parts.join('\n'); + } + return null; + } +} diff --git a/packages/codex_sdk/lib/src/protocol/codex_event_parser.dart b/packages/codex_sdk/lib/src/protocol/codex_event_parser.dart new file mode 100644 index 0000000..38c526c --- /dev/null +++ b/packages/codex_sdk/lib/src/protocol/codex_event_parser.dart @@ -0,0 +1,13 @@ +import 'codex_event.dart'; +import 'json_rpc_message.dart'; + +/// Converts [JsonRpcNotification]s from the transport into [CodexEvent]s. +/// +/// The transport layer handles JSONL framing and message routing. +/// This parser just maps typed notification objects to domain events. +class CodexEventParser { + /// Convert a [JsonRpcNotification] into a [CodexEvent]. + CodexEvent parseNotification(JsonRpcNotification notification) { + return CodexEvent.fromNotification(notification); + } +} diff --git a/packages/codex_sdk/lib/src/protocol/json_rpc_message.dart b/packages/codex_sdk/lib/src/protocol/json_rpc_message.dart new file mode 100644 index 0000000..2698d59 --- /dev/null +++ b/packages/codex_sdk/lib/src/protocol/json_rpc_message.dart @@ -0,0 +1,174 @@ +import 'dart:convert'; + +/// JSON-RPC message types for the Codex app-server protocol. +/// +/// The app-server communicates via JSONL (one JSON object per line) over +/// stdin/stdout. Messages follow JSON-RPC 2.0 conventions but omit the +/// `"jsonrpc":"2.0"` header. +/// +/// Three message patterns: +/// - **Notifications** (server→client): no `id`, just `method` + `params` +/// - **Requests** (bidirectional): has `id` + `method` + `params`, expects a response +/// - **Responses**: has `id` + `result`/`error`, correlates to a prior request +sealed class JsonRpcMessage { + const JsonRpcMessage(); + + factory JsonRpcMessage.fromJson(Map json) { + final hasId = json.containsKey('id'); + final hasMethod = json.containsKey('method'); + final hasResult = json.containsKey('result'); + final hasError = json.containsKey('error'); + + if (hasId && (hasResult || hasError) && !hasMethod) { + // Response to a request we sent + return JsonRpcResponse.fromJson(json); + } + + if (hasId && hasMethod) { + // Request from server (e.g., approval requests) + return JsonRpcRequest.fromJson(json); + } + + if (hasMethod && !hasId) { + // Notification from server + return JsonRpcNotification.fromJson(json); + } + + // Error response has id + error but no method + if (hasId && hasError) { + return JsonRpcResponse.fromJson(json); + } + + return JsonRpcNotification( + method: json['method'] as String? ?? 'unknown', + params: json, + ); + } + + /// Parse a single JSONL line into a [JsonRpcMessage]. + /// Returns null if the line is empty or cannot be parsed. + static JsonRpcMessage? parseLine(String line) { + final trimmed = line.trim(); + if (trimmed.isEmpty) return null; + + try { + final json = jsonDecode(trimmed) as Map; + return JsonRpcMessage.fromJson(json); + } catch (_) { + return null; + } + } +} + +/// A notification from the server (no response expected). +/// +/// Examples: `turn/started`, `item/completed`, `item/agentMessage/delta` +class JsonRpcNotification extends JsonRpcMessage { + final String method; + final Map params; + + const JsonRpcNotification({ + required this.method, + required this.params, + }); + + factory JsonRpcNotification.fromJson(Map json) { + return JsonRpcNotification( + method: json['method'] as String? ?? '', + params: json['params'] as Map? ?? {}, + ); + } + + @override + String toString() => 'JsonRpcNotification(method: $method)'; +} + +/// A request from the server that requires a response from the client. +/// +/// Used for approval requests: `item/commandExecution/requestApproval`, +/// `item/fileChange/requestApproval`, `item/tool/requestUserInput`. +class JsonRpcRequest extends JsonRpcMessage { + /// Request ID (String or int). Must be echoed back in the response. + final dynamic id; + final String method; + final Map params; + + const JsonRpcRequest({ + required this.id, + required this.method, + required this.params, + }); + + factory JsonRpcRequest.fromJson(Map json) { + return JsonRpcRequest( + id: json['id'], + method: json['method'] as String? ?? '', + params: json['params'] as Map? ?? {}, + ); + } + + @override + String toString() => 'JsonRpcRequest(id: $id, method: $method)'; +} + +/// A response to a request we sent to the server. +class JsonRpcResponse extends JsonRpcMessage { + /// Matches the `id` of the request this responds to. + final dynamic id; + final Map? result; + final JsonRpcError? error; + + const JsonRpcResponse({ + required this.id, + this.result, + this.error, + }); + + bool get isError => error != null; + + factory JsonRpcResponse.fromJson(Map json) { + final errorData = json['error']; + JsonRpcError? error; + if (errorData is Map) { + error = JsonRpcError.fromJson(errorData); + } + + final resultData = json['result']; + final Map? result = resultData is Map + ? Map.from(resultData) + : null; + + return JsonRpcResponse( + id: json['id'], + result: result, + error: error, + ); + } + + @override + String toString() => 'JsonRpcResponse(id: $id, isError: $isError)'; +} + +/// A JSON-RPC error object. +class JsonRpcError { + final int code; + final String message; + final dynamic data; + + const JsonRpcError({ + required this.code, + required this.message, + this.data, + }); + + factory JsonRpcError.fromJson(Map json) { + return JsonRpcError( + code: json['code'] as int? ?? -1, + message: json['message'] as String? ?? 'Unknown error', + data: json['data'], + ); + } + + @override + String toString() => 'JsonRpcError(code: $code, message: $message)'; +} diff --git a/packages/codex_sdk/lib/src/transport/codex_transport.dart b/packages/codex_sdk/lib/src/transport/codex_transport.dart new file mode 100644 index 0000000..e3f63fb --- /dev/null +++ b/packages/codex_sdk/lib/src/transport/codex_transport.dart @@ -0,0 +1,216 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; + +import '../protocol/json_rpc_message.dart'; + +/// Persistent subprocess transport for the Codex app-server. +/// +/// Manages a long-lived `codex app-server` process, communicating via +/// JSON-RPC over JSONL on stdin/stdout. Handles: +/// - Request/response correlation (auto-increments `id`, matches responses) +/// - Routing incoming messages to typed streams +/// - Subprocess lifecycle (start/close) +class CodexTransport { + Process? _process; + int _nextId = 0; + bool _closed = false; + + /// Pending requests waiting for a response, keyed by request id. + final _pendingRequests = >{}; + + /// Server notifications (no id, no response expected). + final _notificationController = + StreamController.broadcast(); + + /// Server requests that need a client response (e.g., approvals). + final _serverRequestController = + StreamController.broadcast(); + + /// Raw stderr output for error diagnostics. + final _stderrBuffer = StringBuffer(); + + /// Whether the transport is currently connected to a subprocess. + bool get isRunning => _process != null && !_closed; + + /// Server notifications stream. + Stream get notifications => + _notificationController.stream; + + /// Server requests stream (approval requests). + Stream get serverRequests => + _serverRequestController.stream; + + /// Start the `codex app-server` subprocess. + Future start({ + String? workingDirectory, + List extraArgs = const [], + }) async { + if (_closed) { + throw StateError('Transport has been closed'); + } + if (_process != null) { + throw StateError('Transport already started'); + } + + final args = ['app-server', ...extraArgs]; + + _process = await Process.start( + 'codex', + args, + workingDirectory: workingDirectory, + environment: Platform.environment, + ); + + _process!.stdout.transform(utf8.decoder).listen( + _onStdoutData, + onDone: _onProcessDone, + ); + + _process!.stderr.transform(utf8.decoder).listen((chunk) { + _stderrBuffer.write(chunk); + }); + } + + /// Send a JSON-RPC request and wait for the correlated response. + Future sendRequest( + String method, [ + Map? params, + ]) { + _ensureRunning(); + + final id = _nextId++; + final completer = Completer(); + _pendingRequests[id] = completer; + + final request = { + 'method': method, + 'id': id, + if (params != null) 'params': params, + }; + + _writeLine(jsonEncode(request)); + return completer.future; + } + + /// Send a JSON-RPC notification (fire-and-forget, no response expected). + void sendNotification(String method, [Map? params]) { + _ensureRunning(); + + final notification = { + 'method': method, + if (params != null) 'params': params, + }; + + _writeLine(jsonEncode(notification)); + } + + /// Respond to a server-initiated request (e.g., approval decisions). + void respondToRequest(dynamic requestId, Map result) { + _ensureRunning(); + + final response = { + 'id': requestId, + 'result': result, + }; + + _writeLine(jsonEncode(response)); + } + + /// Close the transport and kill the subprocess. + Future close() async { + if (_closed) return; + _closed = true; + + // Complete all pending requests with an error + for (final completer in _pendingRequests.values) { + if (!completer.isCompleted) { + completer.completeError( + StateError('Transport closed while request was pending'), + ); + } + } + _pendingRequests.clear(); + + _process?.kill(ProcessSignal.sigterm); + _process = null; + + await _notificationController.close(); + await _serverRequestController.close(); + } + + // -------------------------------------------------------------------------- + // Private + // -------------------------------------------------------------------------- + + final _lineBuffer = StringBuffer(); + + void _onStdoutData(String chunk) { + _lineBuffer.write(chunk); + final content = _lineBuffer.toString(); + final lines = content.split('\n'); + + // Keep the last potentially incomplete line + _lineBuffer.clear(); + if (!content.endsWith('\n') && lines.isNotEmpty) { + _lineBuffer.write(lines.removeLast()); + } else if (lines.isNotEmpty && lines.last.isEmpty) { + lines.removeLast(); + } + + for (final line in lines) { + final trimmed = line.trim(); + if (trimmed.isEmpty) continue; + _routeMessage(trimmed); + } + } + + void _routeMessage(String line) { + final message = JsonRpcMessage.parseLine(line); + if (message == null) return; + + switch (message) { + case JsonRpcResponse response: + final completer = _pendingRequests.remove(response.id); + if (completer != null && !completer.isCompleted) { + completer.complete(response); + } + case JsonRpcRequest request: + if (!_serverRequestController.isClosed) { + _serverRequestController.add(request); + } + case JsonRpcNotification notification: + if (!_notificationController.isClosed) { + _notificationController.add(notification); + } + } + } + + void _onProcessDone() { + if (_closed) return; + + // Complete all pending requests with an error + for (final completer in _pendingRequests.values) { + if (!completer.isCompleted) { + final stderr = _stderrBuffer.toString(); + completer.completeError( + StateError( + 'codex app-server process terminated unexpectedly' + '${stderr.isNotEmpty ? ': $stderr' : ''}', + ), + ); + } + } + _pendingRequests.clear(); + _process = null; + } + + void _writeLine(String json) { + _process!.stdin.writeln(json); + } + + void _ensureRunning() { + if (_closed) throw StateError('Transport has been closed'); + if (_process == null) throw StateError('Transport not started'); + } +} diff --git a/packages/codex_sdk/pubspec.yaml b/packages/codex_sdk/pubspec.yaml new file mode 100644 index 0000000..6173e5a --- /dev/null +++ b/packages/codex_sdk/pubspec.yaml @@ -0,0 +1,16 @@ +name: codex_sdk +description: Dart wrapper for OpenAI Codex CLI headless mode +version: 0.1.0 +resolution: workspace + +environment: + sdk: ^3.8.0 + +dependencies: + claude_sdk: any + uuid: ^4.5.1 + path: ^1.8.3 + +dev_dependencies: + test: ^1.24.0 + lints: ^3.0.0 diff --git a/packages/codex_sdk/test/codex_client_e2e_test.dart b/packages/codex_sdk/test/codex_client_e2e_test.dart new file mode 100644 index 0000000..3dcaee5 --- /dev/null +++ b/packages/codex_sdk/test/codex_client_e2e_test.dart @@ -0,0 +1,283 @@ +@Tags(['e2e']) +import 'dart:io'; + +import 'package:claude_sdk/claude_sdk.dart'; +import 'package:codex_sdk/codex_sdk.dart'; +import 'package:test/test.dart'; + +/// End-to-end test that runs a real `codex app-server` subprocess. +/// +/// Requires: +/// - `codex` CLI installed and on PATH +/// - Valid OpenAI API key configured +/// +/// Run with: dart test test/codex_client_e2e_test.dart --tags e2e +void main() { + late CodexClient client; + late Directory tempDir; + + setUpAll(() { + final result = Process.runSync('which', ['codex']); + if (result.exitCode != 0) { + fail('codex CLI not found on PATH — skipping e2e tests'); + } + }); + + setUp(() { + tempDir = Directory.systemTemp.createTempSync('codex_e2e_'); + client = CodexClient( + codexConfig: CodexConfig( + workingDirectory: tempDir.path, + skipGitRepoCheck: true, + approvalPolicy: 'never', + ), + ); + }); + + tearDown(() async { + await client.close(); + // Small delay to let any dangling async handlers settle + await Future.delayed(const Duration(milliseconds: 200)); + if (tempDir.existsSync()) { + tempDir.deleteSync(recursive: true); + } + }); + + test('sends a simple prompt and receives a complete conversation', () async { + await client.init(); + + // Subscribe to streams BEFORE sending the message to avoid races + final turnFuture = client.onTurnComplete.first; + final statuses = []; + final sub = client.statusStream.listen(statuses.add); + + client.sendMessage( + Message(text: 'Respond with exactly: "hello from codex"'), + ); + + await turnFuture.timeout( + const Duration(seconds: 60), + onTimeout: () => fail('Timed out waiting for turn completion'), + ); + await sub.cancel(); + + final conv = client.currentConversation; + expect(conv.messages, isNotEmpty); + + // First message should be the user message we sent + final userMsg = conv.messages.first; + expect(userMsg.role, MessageRole.user); + expect(userMsg.content, contains('hello from codex')); + + // Should have at least one assistant message + final assistantMessages = conv.messages + .where((m) => m.role == MessageRole.assistant) + .toList(); + expect( + assistantMessages, + isNotEmpty, + reason: 'Expected assistant messages in conversation', + ); + + // The assistant message should have responses with text content + final lastAssistant = assistantMessages.last; + expect( + lastAssistant.responses, + isNotEmpty, + reason: + 'No responses. Types: ${conv.messages.map((m) => '${m.role}: ${m.responses.map((r) => r.runtimeType).toList()}').toList()}', + ); + + final textResponses = lastAssistant.responses + .whereType() + .toList(); + expect( + textResponses, + isNotEmpty, + reason: + 'No TextResponse. Types: ${lastAssistant.responses.map((r) => '${r.runtimeType}(${r.id})').toList()}', + ); + + final allText = textResponses.map((r) => r.content).join(); + expect(allText, isNotEmpty); + + // Status should have gone through processing -> ready + expect(statuses, contains(ClaudeStatus.processing)); + expect(statuses.last, ClaudeStatus.ready); + expect(conv.isProcessing, isFalse); + }); + + test('captures thread ID from thread/start response', () async { + await client.init(); + + // Thread ID is set during init from thread/start response + expect( + client.threadId, + isNotNull, + reason: 'Expected threadId from thread/start', + ); + expect(client.threadId, isNotEmpty); + }); + + test('status transitions correctly during a turn', () async { + await client.init(); + + final statuses = []; + final sub = client.statusStream.listen(statuses.add); + final turnFuture = client.onTurnComplete.first; + + expect(client.currentStatus, ClaudeStatus.ready); + + client.sendMessage(Message(text: 'Say "test"')); + + // Should immediately transition to processing + expect(client.currentStatus, ClaudeStatus.processing); + + await turnFuture.timeout( + const Duration(seconds: 60), + onTimeout: () => fail('Timed out'), + ); + await sub.cancel(); + + expect(client.currentStatus, ClaudeStatus.ready); + expect(statuses.first, ClaudeStatus.processing); + expect(statuses.last, ClaudeStatus.ready); + }); + + test('abort sends turn/interrupt and resets status', () async { + await client.init(); + + final processingFuture = client.statusStream + .firstWhere((s) => s == ClaudeStatus.processing) + .timeout( + const Duration(seconds: 10), + onTimeout: () => ClaudeStatus.ready, + ); + + client.sendMessage( + Message( + text: + 'Write a very long essay about the complete history of computing ' + 'from ancient abacuses to quantum computing. Include at least 100 ' + 'detailed paragraphs covering every decade.', + ), + ); + + final status = await processingFuture; + if (status == ClaudeStatus.processing) { + await client.abort(); + expect(client.currentStatus, ClaudeStatus.ready); + } + // If it already finished, that's fine + }); + + test('clearConversation resets state and starts new thread', () async { + await client.init(); + + final turnFuture = client.onTurnComplete.first; + client.sendMessage(Message(text: 'Say "hello"')); + + await turnFuture.timeout( + const Duration(seconds: 60), + onTimeout: () => fail('Timed out'), + ); + + expect(client.currentConversation.messages, isNotEmpty); + final oldThreadId = client.threadId; + + await client.clearConversation(); + expect(client.currentConversation.messages, isEmpty); + + // Should have a new thread ID + expect(client.threadId, isNotNull); + expect(client.threadId, isNot(equals(oldThreadId))); + }); + + test('multi-turn sends follow-up on same thread', () async { + await client.init(); + + // Turn 1: establish a fact + final turn1Future = client.onTurnComplete.first; + client.sendMessage( + Message(text: 'Remember this number: 42. Just say "ok, remembered."'), + ); + + await turn1Future.timeout( + const Duration(seconds: 60), + onTimeout: () => fail('Timed out on turn 1'), + ); + + // Should have captured a thread ID during init + expect(client.threadId, isNotNull); + final threadId = client.threadId; + + // Turn 2: ask about the fact (same persistent thread) + final turn2Future = client.onTurnComplete.first; + client.sendMessage( + Message(text: 'What number did I just tell you to remember? Reply with just the number.'), + ); + + await turn2Future.timeout( + const Duration(seconds: 60), + onTimeout: () => fail('Timed out on turn 2'), + ); + + // Thread ID should be the same (persistent session) + expect(client.threadId, equals(threadId)); + + // Should have user + assistant messages from both turns + final conv = client.currentConversation; + final userMessages = conv.messages + .where((m) => m.role == MessageRole.user) + .toList(); + expect( + userMessages.length, + greaterThanOrEqualTo(2), + reason: 'Expected at least 2 user messages for multi-turn', + ); + + final assistantMessages = conv.messages + .where((m) => m.role == MessageRole.assistant) + .toList(); + expect( + assistantMessages.length, + greaterThanOrEqualTo(2), + reason: + 'Expected at least 2 assistant messages for multi-turn. ' + 'Messages: ${conv.messages.map((m) => '${m.role}(${m.responses.length} responses)').toList()}', + ); + + // The second assistant response should reference the number + final lastAssistant = assistantMessages.last; + final textResponses = lastAssistant.responses + .whereType() + .toList(); + expect( + textResponses, + isNotEmpty, + reason: + 'No TextResponse in last assistant. ' + 'Response types: ${lastAssistant.responses.map((r) => '${r.runtimeType}(${r.id})').toList()}. ' + 'All messages: ${conv.messages.map((m) => '${m.role}: ${m.responses.map((r) => '${r.runtimeType}').toList()}').toList()}', + ); + + final allText = textResponses.map((r) => r.content).join(); + expect( + allText, + contains('42'), + reason: 'Expected assistant to recall the number 42', + ); + }); + + test('cleans up .codex/config.toml on close', () async { + await client.init(); + + final codexDir = Directory('${tempDir.path}/.codex'); + codexDir.createSync(); + File('${codexDir.path}/config.toml').writeAsStringSync('# test config\n'); + + await client.close(); + + expect(File('${codexDir.path}/config.toml').existsSync(), isFalse); + }); +} diff --git a/packages/codex_sdk/test/codex_config_test.dart b/packages/codex_sdk/test/codex_config_test.dart new file mode 100644 index 0000000..de34cf4 --- /dev/null +++ b/packages/codex_sdk/test/codex_config_test.dart @@ -0,0 +1,119 @@ +import 'package:codex_sdk/codex_sdk.dart'; +import 'package:test/test.dart'; + +void main() { + group('CodexConfig.toThreadStartParams', () { + test('includes cwd when workingDirectory is set', () { + const config = CodexConfig(workingDirectory: '/tmp/project'); + final params = config.toThreadStartParams(); + expect(params['cwd'], '/tmp/project'); + }); + + test('omits cwd when workingDirectory is null', () { + const config = CodexConfig(); + final params = config.toThreadStartParams(); + expect(params.containsKey('cwd'), isFalse); + }); + + test('includes sandbox mode', () { + const config = CodexConfig(sandboxMode: 'danger-full-access'); + final params = config.toThreadStartParams(); + expect(params['sandbox'], 'danger-full-access'); + }); + + test('uses default sandbox mode workspace-write', () { + const config = CodexConfig(); + final params = config.toThreadStartParams(); + expect(params['sandbox'], 'workspace-write'); + }); + + test('includes approval policy', () { + const config = CodexConfig(approvalPolicy: 'never'); + final params = config.toThreadStartParams(); + expect(params['approvalPolicy'], 'never'); + }); + + test('uses default approval policy on-failure', () { + const config = CodexConfig(); + final params = config.toThreadStartParams(); + expect(params['approvalPolicy'], 'on-failure'); + }); + + test('includes model when set', () { + const config = CodexConfig(model: 'o3'); + final params = config.toThreadStartParams(); + expect(params['model'], 'o3'); + }); + + test('omits model when null', () { + const config = CodexConfig(); + final params = config.toThreadStartParams(); + expect(params.containsKey('model'), isFalse); + }); + + test('includes developerInstructions from appendSystemPrompt', () { + const config = CodexConfig(appendSystemPrompt: 'Be concise'); + final params = config.toThreadStartParams(); + expect(params['developerInstructions'], 'Be concise'); + }); + + test('omits developerInstructions when appendSystemPrompt is null', () { + const config = CodexConfig(); + final params = config.toThreadStartParams(); + expect(params.containsKey('developerInstructions'), isFalse); + }); + + test('builds complete params with all fields', () { + const config = CodexConfig( + model: 'o4-mini', + sandboxMode: 'read-only', + workingDirectory: '/home/user/project', + appendSystemPrompt: 'Be helpful', + approvalPolicy: 'on-request', + ); + final params = config.toThreadStartParams(); + expect(params['model'], 'o4-mini'); + expect(params['sandbox'], 'read-only'); + expect(params['cwd'], '/home/user/project'); + expect(params['developerInstructions'], 'Be helpful'); + expect(params['approvalPolicy'], 'on-request'); + }); + }); + + group('CodexConfig.copyWith', () { + test('copies all fields', () { + const original = CodexConfig( + model: 'o3', + profile: 'test', + sandboxMode: 'danger-full-access', + workingDirectory: '/tmp', + sessionId: 'session_1', + appendSystemPrompt: 'Be brief', + additionalDirs: ['/data'], + approvalPolicy: 'never', + ); + + final copy = original.copyWith(model: 'o4-mini'); + expect(copy.model, 'o4-mini'); + expect(copy.profile, 'test'); + expect(copy.sandboxMode, 'danger-full-access'); + expect(copy.workingDirectory, '/tmp'); + expect(copy.sessionId, 'session_1'); + expect(copy.appendSystemPrompt, 'Be brief'); + expect(copy.additionalDirs, ['/data']); + expect(copy.approvalPolicy, 'never'); + }); + + test('preserves values when no overrides given', () { + const original = CodexConfig(model: 'o3'); + final copy = original.copyWith(); + expect(copy.model, 'o3'); + }); + + test('can override approvalPolicy', () { + const original = CodexConfig(); + final copy = original.copyWith(approvalPolicy: 'never'); + expect(copy.approvalPolicy, 'never'); + }); + }); +} diff --git a/packages/codex_sdk/test/codex_event_mapper_test.dart b/packages/codex_sdk/test/codex_event_mapper_test.dart new file mode 100644 index 0000000..6f933d6 --- /dev/null +++ b/packages/codex_sdk/test/codex_event_mapper_test.dart @@ -0,0 +1,520 @@ +import 'package:claude_sdk/claude_sdk.dart'; +import 'package:codex_sdk/codex_sdk.dart'; +import 'package:test/test.dart'; + +void main() { + late CodexEventMapper mapper; + + setUp(() { + mapper = CodexEventMapper(); + }); + + group('CodexEventMapper.mapEvent', () { + group('thread/started', () { + test('maps to MetaResponse with session_id', () { + const event = ThreadStartedEvent( + threadId: 'thread_abc', + threadData: {}, + ); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + expect(responses[0], isA()); + final meta = responses[0] as MetaResponse; + expect(meta.id, 'thread_abc'); + expect(meta.metadata['session_id'], 'thread_abc'); + }); + }); + + group('turn/started', () { + test('maps to StatusResponse processing', () { + const event = TurnStartedEvent(turnId: '0', turnData: {}); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + expect(responses[0], isA()); + expect( + (responses[0] as StatusResponse).status, + ClaudeStatus.processing, + ); + }); + }); + + group('turn/completed', () { + test('maps to CompletionResponse', () { + const event = TurnCompletedEvent( + turnId: '0', + status: 'completed', + turnData: {}, + ); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + expect(responses[0], isA()); + final completion = responses[0] as CompletionResponse; + expect(completion.stopReason, 'completed'); + }); + }); + + group('task_complete', () { + test('maps to CompletionResponse', () { + const event = TaskCompleteEvent( + lastAgentMessage: 'Done!', + params: {}, + ); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + expect(responses[0], isA()); + expect((responses[0] as CompletionResponse).stopReason, 'completed'); + }); + }); + + group('error event', () { + test('maps to ErrorResponse', () { + const event = CodexErrorEvent(message: 'connection lost'); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + expect(responses[0], isA()); + expect((responses[0] as ErrorResponse).error, 'connection lost'); + }); + }); + + group('unknown event', () { + test('maps to empty list', () { + const event = UnknownCodexEvent(method: 'future/thing', params: {}); + final responses = mapper.mapEvent(event); + expect(responses, isEmpty); + }); + }); + + group('token usage', () { + test('maps to CompletionResponse with usage data', () { + final event = TokenUsageUpdatedEvent( + usage: CodexUsage( + inputTokens: 100, + cachedInputTokens: 50, + outputTokens: 200, + ), + ); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + expect(responses[0], isA()); + final completion = responses[0] as CompletionResponse; + expect(completion.stopReason, 'usage_update'); + expect(completion.inputTokens, 100); + expect(completion.outputTokens, 200); + expect(completion.cacheReadInputTokens, 50); + }); + }); + + group('agentMessage delta', () { + test('maps to non-cumulative TextResponse', () { + const event = AgentMessageDeltaEvent( + itemId: 'msg_001', + delta: 'Hello ', + ); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + expect(responses[0], isA()); + final text = responses[0] as TextResponse; + expect(text.id, 'msg_001'); + expect(text.content, 'Hello '); + expect(text.isCumulative, isFalse); + }); + }); + + group('agentMessage item completed', () { + test('maps to cumulative TextResponse', () { + const event = ItemCompletedEvent( + itemId: 'msg_001', + itemType: 'agentMessage', + itemData: {'text': 'Hello world'}, + ); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + expect(responses[0], isA()); + final text = responses[0] as TextResponse; + expect(text.id, 'msg_001'); + expect(text.content, 'Hello world'); + expect(text.isCumulative, isTrue); + }); + + test('returns empty for empty text', () { + const event = ItemCompletedEvent( + itemId: 'msg_001', + itemType: 'agentMessage', + itemData: {'text': ''}, + ); + expect(mapper.mapEvent(event), isEmpty); + }); + + test('returns empty for no text key', () { + const event = ItemCompletedEvent( + itemId: 'msg_001', + itemType: 'agentMessage', + itemData: {}, + ); + expect(mapper.mapEvent(event), isEmpty); + }); + }); + + group('commandExecution item', () { + test('maps started event to ToolUseResponse with Bash', () { + const event = ItemStartedEvent( + itemId: 'cmd_001', + itemType: 'commandExecution', + itemData: {'command': 'ls -la'}, + ); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + expect(responses[0], isA()); + final tool = responses[0] as ToolUseResponse; + expect(tool.toolName, 'Bash'); + expect(tool.parameters['command'], 'ls -la'); + expect(tool.toolUseId, 'cmd_001'); + }); + + test('maps completed event to ToolResultResponse', () { + const event = ItemCompletedEvent( + itemId: 'cmd_001', + itemType: 'commandExecution', + itemData: { + 'command': 'ls -la', + 'exit_code': 0, + 'aggregated_output': 'file1.dart\nfile2.dart', + }, + ); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + expect(responses[0], isA()); + final result = responses[0] as ToolResultResponse; + expect(result.toolUseId, 'cmd_001'); + expect(result.content, 'file1.dart\nfile2.dart'); + expect(result.isError, isFalse); + }); + + test('marks non-zero exit code as error', () { + const event = ItemCompletedEvent( + itemId: 'cmd_001', + itemType: 'commandExecution', + itemData: {'exit_code': 1, 'aggregated_output': 'command not found'}, + ); + final responses = mapper.mapEvent(event); + expect((responses[0] as ToolResultResponse).isError, isTrue); + }); + + test('falls back to output field when aggregated_output missing', () { + const event = ItemCompletedEvent( + itemId: 'cmd_001', + itemType: 'commandExecution', + itemData: {'exit_code': 0, 'output': 'fallback output'}, + ); + final responses = mapper.mapEvent(event); + expect((responses[0] as ToolResultResponse).content, 'fallback output'); + }); + }); + + group('fileChange item', () { + test('maps started event with changes to ToolUseResponse', () { + const event = ItemStartedEvent( + itemId: 'file_001', + itemType: 'fileChange', + itemData: { + 'changes': [ + {'path': 'lib/foo.dart', 'kind': 'add'}, + ], + }, + ); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + expect(responses[0], isA()); + final tool = responses[0] as ToolUseResponse; + expect(tool.toolName, 'Write'); + expect(tool.parameters['files'], ['lib/foo.dart']); + expect(tool.parameters['kind'], 'add'); + }); + + test('infers Edit tool for update kind', () { + const event = ItemStartedEvent( + itemId: 'file_001', + itemType: 'fileChange', + itemData: { + 'changes': [ + {'path': 'lib/foo.dart', 'kind': 'update'}, + ], + }, + ); + final responses = mapper.mapEvent(event); + expect((responses[0] as ToolUseResponse).toolName, 'Edit'); + }); + + test('maps completed event to summary ToolResultResponse', () { + const event = ItemCompletedEvent( + itemId: 'file_001', + itemType: 'fileChange', + itemData: { + 'changes': [ + {'path': 'lib/foo.dart', 'kind': 'add'}, + {'path': 'lib/bar.dart', 'kind': 'update'}, + ], + }, + ); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + final result = responses[0] as ToolResultResponse; + expect(result.content, 'add: lib/foo.dart\nupdate: lib/bar.dart'); + expect(result.isError, isFalse); + }); + + test('handles completed event with no changes', () { + const event = ItemCompletedEvent( + itemId: 'file_001', + itemType: 'fileChange', + itemData: {}, + ); + final responses = mapper.mapEvent(event); + expect((responses[0] as ToolResultResponse).content, 'Done'); + }); + }); + + group('mcpToolCall item', () { + test('maps started event with server prefix', () { + const event = ItemStartedEvent( + itemId: 'mcp_001', + itemType: 'mcpToolCall', + itemData: { + 'server': 'vide-git', + 'tool': 'gitStatus', + 'arguments': {'detailed': true}, + }, + ); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + final tool = responses[0] as ToolUseResponse; + expect(tool.toolName, 'mcp__vide-git__gitStatus'); + expect(tool.parameters['detailed'], true); + }); + + test('uses tool name alone when server is empty', () { + const event = ItemStartedEvent( + itemId: 'mcp_001', + itemType: 'mcpToolCall', + itemData: {'server': '', 'tool': 'someBuiltinTool', 'arguments': {}}, + ); + final responses = mapper.mapEvent(event); + expect((responses[0] as ToolUseResponse).toolName, 'someBuiltinTool'); + }); + + test('handles non-map arguments', () { + const event = ItemStartedEvent( + itemId: 'mcp_001', + itemType: 'mcpToolCall', + itemData: { + 'server': 'test', + 'tool': 'myTool', + 'arguments': 'not a map', + }, + ); + final responses = mapper.mapEvent(event); + expect((responses[0] as ToolUseResponse).parameters, isEmpty); + }); + + test('maps completed event with string result', () { + const event = ItemCompletedEvent( + itemId: 'mcp_001', + itemType: 'mcpToolCall', + itemData: {'result': 'some output'}, + ); + final responses = mapper.mapEvent(event); + final result = responses[0] as ToolResultResponse; + expect(result.content, 'some output'); + expect(result.isError, isFalse); + }); + + test('maps completed event with content block array result', () { + const event = ItemCompletedEvent( + itemId: 'mcp_001', + itemType: 'mcpToolCall', + itemData: { + 'result': [ + {'type': 'text', 'text': 'line 1'}, + {'type': 'text', 'text': 'line 2'}, + ], + }, + ); + final responses = mapper.mapEvent(event); + final result = responses[0] as ToolResultResponse; + expect(result.content, 'line 1\nline 2'); + }); + + test('maps completed event with error', () { + const event = ItemCompletedEvent( + itemId: 'mcp_001', + itemType: 'mcpToolCall', + itemData: {'error': 'tool not found'}, + ); + final responses = mapper.mapEvent(event); + final result = responses[0] as ToolResultResponse; + expect(result.content, 'tool not found'); + expect(result.isError, isTrue); + }); + }); + + group('reasoning item', () { + test('maps completed event to TextResponse', () { + const event = ItemCompletedEvent( + itemId: 'reason_001', + itemType: 'reasoning', + itemData: {'text': 'Let me think...'}, + ); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + final text = responses[0] as TextResponse; + expect(text.content, 'Let me think...'); + expect(text.isCumulative, isTrue); + }); + + test('falls back to summary field', () { + const event = ItemCompletedEvent( + itemId: 'reason_001', + itemType: 'reasoning', + itemData: {'summary': 'Thinking summary'}, + ); + final responses = mapper.mapEvent(event); + expect((responses[0] as TextResponse).content, 'Thinking summary'); + }); + + test('handles summary as list of content blocks', () { + const event = ItemCompletedEvent( + itemId: 'reason_001', + itemType: 'reasoning', + itemData: { + 'summary': [ + {'type': 'summary_text', 'text': 'Thinking about it'}, + ], + }, + ); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + expect( + (responses[0] as TextResponse).content, 'Thinking about it'); + }); + + test('returns empty for empty text', () { + const event = ItemCompletedEvent( + itemId: 'reason_001', + itemType: 'reasoning', + itemData: {'text': ''}, + ); + expect(mapper.mapEvent(event), isEmpty); + }); + }); + + group('webSearch item', () { + test('maps started event to WebSearch ToolUseResponse', () { + const event = ItemStartedEvent( + itemId: 'search_001', + itemType: 'webSearch', + itemData: {'query': 'dart async patterns'}, + ); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + final tool = responses[0] as ToolUseResponse; + expect(tool.toolName, 'WebSearch'); + expect(tool.parameters['query'], 'dart async patterns'); + }); + + test('maps completed event to ToolResultResponse', () { + const event = ItemCompletedEvent( + itemId: 'search_001', + itemType: 'webSearch', + itemData: {}, + ); + final responses = mapper.mapEvent(event); + final result = responses[0] as ToolResultResponse; + expect(result.content, 'Search complete'); + expect(result.isError, isFalse); + }); + }); + + group('todoList item', () { + test('maps completed event to checklist TextResponse', () { + const event = ItemCompletedEvent( + itemId: 'todo_001', + itemType: 'todoList', + itemData: { + 'items': [ + {'text': 'Write tests', 'completed': true}, + {'text': 'Fix bugs', 'completed': false}, + ], + }, + ); + final responses = mapper.mapEvent(event); + expect(responses, hasLength(1)); + final text = responses[0] as TextResponse; + expect(text.content, '[x] Write tests\n[ ] Fix bugs'); + expect(text.isCumulative, isTrue); + }); + + test('returns empty for empty items list', () { + const event = ItemCompletedEvent( + itemId: 'todo_001', + itemType: 'todoList', + itemData: {'items': []}, + ); + expect(mapper.mapEvent(event), isEmpty); + }); + }); + + group('unknown item type', () { + test('returns empty list for started', () { + const event = ItemStartedEvent( + itemId: 'x_001', + itemType: 'unknownFutureType', + itemData: {}, + ); + expect(mapper.mapEvent(event), isEmpty); + }); + + test('returns empty list for completed', () { + const event = ItemCompletedEvent( + itemId: 'x_001', + itemType: 'unknownFutureType', + itemData: {}, + ); + expect(mapper.mapEvent(event), isEmpty); + }); + }); + + group('events that map to empty', () { + test('ThreadNameUpdatedEvent maps to empty', () { + const event = ThreadNameUpdatedEvent(threadId: 't', name: 'n'); + expect(mapper.mapEvent(event), isEmpty); + }); + + test('McpStartupCompleteEvent maps to empty', () { + const event = McpStartupCompleteEvent(); + expect(mapper.mapEvent(event), isEmpty); + }); + + test('ReasoningSummaryDeltaEvent maps to empty', () { + const event = ReasoningSummaryDeltaEvent(itemId: 'r', delta: 'x'); + expect(mapper.mapEvent(event), isEmpty); + }); + + test('CommandOutputDeltaEvent maps to empty', () { + const event = CommandOutputDeltaEvent(itemId: 'c', delta: 'x'); + expect(mapper.mapEvent(event), isEmpty); + }); + }); + }); + + group('ID generation', () { + test('generates unique IDs across events', () { + final ids = {}; + for (var i = 0; i < 5; i++) { + const event = TurnStartedEvent(turnId: '0', turnData: {}); + final responses = mapper.mapEvent(event); + ids.add(responses[0].id); + } + expect(ids, hasLength(5)); + }); + }); +} diff --git a/packages/codex_sdk/test/codex_event_parser_test.dart b/packages/codex_sdk/test/codex_event_parser_test.dart new file mode 100644 index 0000000..a157f77 --- /dev/null +++ b/packages/codex_sdk/test/codex_event_parser_test.dart @@ -0,0 +1,191 @@ +import 'package:codex_sdk/codex_sdk.dart'; +import 'package:test/test.dart'; + +void main() { + late CodexEventParser parser; + + setUp(() { + parser = CodexEventParser(); + }); + + group('CodexEventParser.parseNotification', () { + test('parses thread/started', () { + final notification = JsonRpcNotification( + method: 'thread/started', + params: { + 'thread': {'id': 'thread_abc123'}, + }, + ); + final event = parser.parseNotification(notification); + expect(event, isA()); + expect((event as ThreadStartedEvent).threadId, 'thread_abc123'); + }); + + test('parses turn/started', () { + final notification = JsonRpcNotification( + method: 'turn/started', + params: { + 'turn': {'id': '0'}, + }, + ); + final event = parser.parseNotification(notification); + expect(event, isA()); + expect((event as TurnStartedEvent).turnId, '0'); + }); + + test('parses turn/completed', () { + final notification = JsonRpcNotification( + method: 'turn/completed', + params: { + 'turn': {'id': '0', 'status': 'completed'}, + }, + ); + final event = parser.parseNotification(notification); + expect(event, isA()); + final completed = event as TurnCompletedEvent; + expect(completed.turnId, '0'); + expect(completed.status, 'completed'); + }); + + test('parses item/started', () { + final notification = JsonRpcNotification( + method: 'item/started', + params: { + 'item': { + 'id': 'item_001', + 'type': 'agentMessage', + }, + }, + ); + final event = parser.parseNotification(notification); + expect(event, isA()); + final item = event as ItemStartedEvent; + expect(item.itemId, 'item_001'); + expect(item.itemType, 'agentMessage'); + }); + + test('parses item/completed', () { + final notification = JsonRpcNotification( + method: 'item/completed', + params: { + 'item': { + 'id': 'item_001', + 'type': 'agentMessage', + 'text': 'Hello!', + }, + }, + ); + final event = parser.parseNotification(notification); + expect(event, isA()); + final item = event as ItemCompletedEvent; + expect(item.itemId, 'item_001'); + expect(item.itemData['text'], 'Hello!'); + }); + + test('parses item/agentMessage/delta', () { + final notification = JsonRpcNotification( + method: 'item/agentMessage/delta', + params: { + 'itemId': 'item_001', + 'delta': 'Hello ', + }, + ); + final event = parser.parseNotification(notification); + expect(event, isA()); + final delta = event as AgentMessageDeltaEvent; + expect(delta.itemId, 'item_001'); + expect(delta.delta, 'Hello '); + }); + + test('parses item/reasoning/summaryTextDelta', () { + final notification = JsonRpcNotification( + method: 'item/reasoning/summaryTextDelta', + params: { + 'itemId': 'r_001', + 'delta': 'thinking...', + }, + ); + final event = parser.parseNotification(notification); + expect(event, isA()); + expect((event as ReasoningSummaryDeltaEvent).delta, 'thinking...'); + }); + + test('parses item/commandExecution/outputDelta', () { + final notification = JsonRpcNotification( + method: 'item/commandExecution/outputDelta', + params: { + 'itemId': 'cmd_001', + 'delta': 'output line', + }, + ); + final event = parser.parseNotification(notification); + expect(event, isA()); + expect((event as CommandOutputDeltaEvent).delta, 'output line'); + }); + + test('parses thread/tokenUsage/updated', () { + final notification = JsonRpcNotification( + method: 'thread/tokenUsage/updated', + params: { + 'usage': { + 'input_tokens': 100, + 'cached_input_tokens': 50, + 'output_tokens': 200, + }, + }, + ); + final event = parser.parseNotification(notification); + expect(event, isA()); + final usage = (event as TokenUsageUpdatedEvent).usage; + expect(usage.inputTokens, 100); + expect(usage.cachedInputTokens, 50); + expect(usage.outputTokens, 200); + }); + + test('parses codex/event/task_complete', () { + final notification = JsonRpcNotification( + method: 'codex/event/task_complete', + params: { + 'msg': { + 'type': 'task_complete', + 'last_agent_message': 'Done!', + }, + }, + ); + final event = parser.parseNotification(notification); + expect(event, isA()); + expect((event as TaskCompleteEvent).lastAgentMessage, 'Done!'); + }); + + test('parses codex/event/mcp_startup_complete', () { + final notification = JsonRpcNotification( + method: 'codex/event/mcp_startup_complete', + params: {}, + ); + final event = parser.parseNotification(notification); + expect(event, isA()); + }); + + test('parses error notification', () { + final notification = JsonRpcNotification( + method: 'error', + params: {'message': 'something broke'}, + ); + final event = parser.parseNotification(notification); + expect(event, isA()); + expect((event as CodexErrorEvent).message, 'something broke'); + }); + + test('returns UnknownCodexEvent for unrecognized method', () { + final notification = JsonRpcNotification( + method: 'future/unknown/event', + params: {'data': 42}, + ); + final event = parser.parseNotification(notification); + expect(event, isA()); + final unknown = event as UnknownCodexEvent; + expect(unknown.method, 'future/unknown/event'); + expect(unknown.params['data'], 42); + }); + }); +} diff --git a/packages/codex_sdk/test/codex_mcp_registry_test.dart b/packages/codex_sdk/test/codex_mcp_registry_test.dart new file mode 100644 index 0000000..5be1223 --- /dev/null +++ b/packages/codex_sdk/test/codex_mcp_registry_test.dart @@ -0,0 +1,53 @@ +import 'dart:io'; + +import 'package:codex_sdk/codex_sdk.dart'; +import 'package:test/test.dart'; + +void main() { + late Directory tempDir; + + setUp(() { + tempDir = Directory.systemTemp.createTempSync('codex_mcp_test_'); + }); + + tearDown(() { + if (tempDir.existsSync()) { + tempDir.deleteSync(recursive: true); + } + }); + + group('CodexMcpRegistry.writeConfig', () { + test('does nothing when mcpServers is empty', () async { + await CodexMcpRegistry.writeConfig( + mcpServers: [], + workingDirectory: tempDir.path, + ); + + final codexDir = Directory('${tempDir.path}/.codex'); + expect(codexDir.existsSync(), isFalse); + }); + }); + + group('CodexMcpRegistry.cleanUp', () { + test('removes config.toml if it exists', () async { + final codexDir = Directory('${tempDir.path}/.codex'); + codexDir.createSync(); + final configFile = File('${codexDir.path}/config.toml'); + configFile.writeAsStringSync('# test config'); + expect(configFile.existsSync(), isTrue); + + await CodexMcpRegistry.cleanUp(workingDirectory: tempDir.path); + expect(configFile.existsSync(), isFalse); + }); + + test('does nothing if config.toml does not exist', () async { + // Should not throw + await CodexMcpRegistry.cleanUp(workingDirectory: tempDir.path); + }); + + test('does nothing if .codex dir does not exist', () async { + await CodexMcpRegistry.cleanUp(workingDirectory: tempDir.path); + expect(Directory('${tempDir.path}/.codex').existsSync(), isFalse); + }); + }); +} diff --git a/packages/codex_sdk/test/json_rpc_message_test.dart b/packages/codex_sdk/test/json_rpc_message_test.dart new file mode 100644 index 0000000..1d92a2f --- /dev/null +++ b/packages/codex_sdk/test/json_rpc_message_test.dart @@ -0,0 +1,121 @@ +import 'dart:convert'; + +import 'package:codex_sdk/codex_sdk.dart'; +import 'package:test/test.dart'; + +void main() { + group('JsonRpcMessage.fromJson', () { + test('parses notification (method, no id)', () { + final json = {'method': 'turn/started', 'params': {'turn': {}}}; + final msg = JsonRpcMessage.fromJson(json); + expect(msg, isA()); + final notif = msg as JsonRpcNotification; + expect(notif.method, 'turn/started'); + expect(notif.params, {'turn': {}}); + }); + + test('parses request (method + id)', () { + final json = { + 'method': 'item/commandExecution/requestApproval', + 'id': 5, + 'params': {'command': 'rm -rf /'}, + }; + final msg = JsonRpcMessage.fromJson(json); + expect(msg, isA()); + final req = msg as JsonRpcRequest; + expect(req.id, 5); + expect(req.method, 'item/commandExecution/requestApproval'); + expect(req.params['command'], 'rm -rf /'); + }); + + test('parses response with result (id + result, no method)', () { + final json = { + 'id': 0, + 'result': {'userAgent': 'codex/0.98.0'}, + }; + final msg = JsonRpcMessage.fromJson(json); + expect(msg, isA()); + final resp = msg as JsonRpcResponse; + expect(resp.id, 0); + expect(resp.result?['userAgent'], 'codex/0.98.0'); + expect(resp.isError, isFalse); + }); + + test('parses error response', () { + final json = { + 'id': 1, + 'error': {'code': -32600, 'message': 'Invalid Request'}, + }; + final msg = JsonRpcMessage.fromJson(json); + expect(msg, isA()); + final resp = msg as JsonRpcResponse; + expect(resp.isError, isTrue); + expect(resp.error!.code, -32600); + expect(resp.error!.message, 'Invalid Request'); + }); + + test('parses response with string id', () { + final json = { + 'id': 'abc-123', + 'result': {}, + }; + final msg = JsonRpcMessage.fromJson(json); + expect(msg, isA()); + expect((msg as JsonRpcResponse).id, 'abc-123'); + }); + + test('notification defaults to empty params', () { + final json = {'method': 'test/event'}; + final msg = JsonRpcMessage.fromJson(json); + expect(msg, isA()); + expect((msg as JsonRpcNotification).params, isEmpty); + }); + }); + + group('JsonRpcMessage.parseLine', () { + test('parses valid JSONL', () { + final line = jsonEncode({'method': 'turn/started', 'params': {}}); + final msg = JsonRpcMessage.parseLine(line); + expect(msg, isA()); + }); + + test('returns null for empty line', () { + expect(JsonRpcMessage.parseLine(''), isNull); + }); + + test('returns null for whitespace', () { + expect(JsonRpcMessage.parseLine(' \t '), isNull); + }); + + test('returns null for invalid JSON', () { + expect(JsonRpcMessage.parseLine('not json'), isNull); + }); + + test('returns null for JSON array', () { + expect(JsonRpcMessage.parseLine('[1, 2]'), isNull); + }); + + test('trims whitespace before parsing', () { + final line = ' ${jsonEncode({'method': 'test', 'params': {}})} '; + final msg = JsonRpcMessage.parseLine(line); + expect(msg, isA()); + }); + }); + + group('JsonRpcError', () { + test('parses from JSON', () { + final json = {'code': -32601, 'message': 'Method not found', 'data': 42}; + final error = JsonRpcError.fromJson(json); + expect(error.code, -32601); + expect(error.message, 'Method not found'); + expect(error.data, 42); + }); + + test('defaults missing fields', () { + final error = JsonRpcError.fromJson({}); + expect(error.code, -1); + expect(error.message, 'Unknown error'); + expect(error.data, isNull); + }); + }); +} diff --git a/pubspec.yaml b/pubspec.yaml index 2da355d..58f71f2 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -11,6 +11,7 @@ environment: workspace: - packages/claude_sdk + - packages/codex_sdk - packages/flutter_runtime_mcp - packages/moondream_api - packages/vide_interface