Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 81 additions & 15 deletions src/api/middleware/logging.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,92 @@
return clone;
}

const loggingMiddleware = pinoHttp({
const baseLoggingMiddleware = pinoHttp({
logger,
customProps: (req) => ({
autoLogging: false, // Disable automatic logging so we can log manually with bodies
customProps: (req, res) => ({

Check failure on line 18 in src/api/middleware/logging.js

View workflow job for this annotation

GitHub Actions / Run Tests (24.x)

'res' is defined but never used

Check failure on line 18 in src/api/middleware/logging.js

View workflow job for this annotation

GitHub Actions / Run Tests (20.x)

'res' is defined but never used

Check failure on line 18 in src/api/middleware/logging.js

View workflow job for this annotation

GitHub Actions / Run Tests (22.x)

'res' is defined but never used

Check failure on line 18 in src/api/middleware/logging.js

View workflow job for this annotation

GitHub Actions / Run Tests (23.x)

'res' is defined but never used
sessionId: req.sessionId ?? null,
}),
customLogLevel: (req, res, err) => {
if (err || res.statusCode >= 500) return "error";
if (res.statusCode >= 400) return "warn";
return "info";
},
wrapSerializers: true,
serializers: {
req(req) {
return {
});

// Wrapper middleware to capture and log full request/response bodies
function loggingMiddleware(req, res, next) {
const startTime = Date.now();

// Log request with full body immediately
logger.info({
sessionId: req.sessionId ?? null,
req: {
method: req.method,
url: req.url,
headers: maskHeaders(req.headers),
},
requestBody: req.body, // Full request body without truncation
}, 'request started');

// Intercept res.write for streaming responses
const originalWrite = res.write;
const chunks = [];
res.write = function (chunk) {
if (chunk) {
chunks.push(Buffer.from(chunk));
}
return originalWrite.apply(this, arguments);
};

// Intercept res.send to capture the body
const originalSend = res.send;
res.send = function (body) {
res._capturedBody = body;

// Parse if it's a JSON string for better logging
if (typeof body === 'string') {
try {
res._capturedBody = JSON.parse(body);
} catch (e) {
res._capturedBody = body;
}
}

return originalSend.call(this, body);
};

// Log response when finished
res.on('finish', () => {
const responseTime = Date.now() - startTime;

// Capture streaming body if not already captured via send()
if (chunks.length > 0 && !res._capturedBody) {
const fullBody = Buffer.concat(chunks).toString('utf8');
res._capturedBody = {
type: 'stream',
contentType: res.getHeader('content-type'),
size: fullBody.length,
preview: fullBody.substring(0, 1000)
};
}

const logLevel = res.statusCode >= 500 ? 'error' : res.statusCode >= 400 ? 'warn' : 'info';

logger[logLevel]({
sessionId: req.sessionId ?? null,
req: {
method: req.method,
url: req.url,
headers: maskHeaders(req.headers),
};
},
},
});
},
res: {
statusCode: res.statusCode,
headers: res.getHeaders ? res.getHeaders() : res.headers,
},
requestBody: req.body, // Full request body without truncation
responseBody: res._capturedBody, // Full response body without truncation
responseTime,
}, 'request completed');
});

// Still call base middleware to set up req.log
baseLoggingMiddleware(req, res, next);
}

module.exports = loggingMiddleware;
18 changes: 16 additions & 2 deletions src/api/middleware/request-logging.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ function requestLoggingMiddleware(req, res, next) {
// Add to response headers
res.setHeader("X-Request-ID", requestId);

// Log request start
// Log request start with full body
logger.info(
{
requestId,
method: req.method,
path: req.path || req.url,
query: req.query,
body: req.body, // Full request body without truncation
ip: req.ip || req.socket.remoteAddress,
userAgent: req.headers["user-agent"],
},
Expand All @@ -43,7 +44,18 @@ function requestLoggingMiddleware(req, res, next) {
res.send = function (body) {
const duration = Date.now() - startTime;

// Log request completion
// Parse body if it's a string
let responseBody = body;
if (typeof body === 'string') {
try {
responseBody = JSON.parse(body);
} catch (e) {
// Keep as string if not JSON
responseBody = body;
}
}

// Log request completion with full request and response bodies
logger.info(
{
requestId,
Expand All @@ -52,6 +64,8 @@ function requestLoggingMiddleware(req, res, next) {
status: res.statusCode,
duration,
contentLength: res.getHeader("content-length"),
requestBody: req.body, // Full request body for reference
responseBody, // Full response body without truncation
},
"Request completed"
);
Expand Down
77 changes: 76 additions & 1 deletion src/api/router.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const openaiRouter = require("./openai-router");
const providersRouter = require("./providers-handler");
const { getRoutingHeaders, getRoutingStats, analyzeComplexity } = require("../routing");
const { validateCwd } = require("../workspace");
const logger = require("../logger");

const router = express.Router();

Expand Down Expand Up @@ -121,6 +122,13 @@ router.post("/v1/messages", rateLimiter, async (req, res, next) => {
const wantsStream = Boolean(req.query?.stream === 'true' || req.body?.stream);
const hasTools = Array.isArray(req.body?.tools) && req.body.tools.length > 0;

logger.info({
sessionId: req.headers['x-claude-session-id'],
wantsStream,
hasTools,
willUseStreamingPath: wantsStream || hasTools
}, "=== REQUEST ROUTING DECISION ===");

// Analyze complexity for routing headers (Phase 3)
const complexity = analyzeComplexity(req.body);
const routingHeaders = getRoutingHeaders({
Expand Down Expand Up @@ -338,6 +346,13 @@ router.post("/v1/messages", rateLimiter, async (req, res, next) => {

// Legacy streaming wrapper (for tool-based requests that requested streaming)
if (wantsStream && hasTools) {
logger.info({
sessionId: req.headers['x-claude-session-id'],
pathType: 'legacy_streaming_wrapper',
wantsStream,
hasTools
}, "=== USING LEGACY STREAMING WRAPPER (TOOL-BASED WITH STREAMING) ===");

metrics.recordStreamingStart();
res.set({
"Content-Type": "text/event-stream",
Expand All @@ -359,6 +374,13 @@ router.post("/v1/messages", rateLimiter, async (req, res, next) => {
// Use proper Anthropic SSE format
const msg = result.body;

logger.info({
sessionId: req.headers['x-claude-session-id'],
eventType: 'message_start',
streamingWithTools: true,
hasContent: !!(msg.content && msg.content.length > 0)
}, "=== SENDING SSE MESSAGE_START ===");

// 1. message_start
res.write(`event: message_start\n`);
res.write(`data: ${JSON.stringify({
Expand Down Expand Up @@ -419,9 +441,52 @@ router.post("/v1/messages", rateLimiter, async (req, res, next) => {

res.write(`event: content_block_stop\n`);
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: i })}\n\n`);
} else if (block.type === "tool_result") {
// === TOOL_RESULT SSE STREAMING - ENTERED ===
logger.info({
blockIndex: i,
blockType: block.type,
toolUseId: block.tool_use_id,
contentType: typeof block.content,
contentLength: typeof block.content === 'string' ? block.content.length : JSON.stringify(block.content).length
}, "=== SSE: STREAMING TOOL_RESULT BLOCK - START ===");

// Stream tool_result blocks so CLI can display actual tool output
res.write(`event: content_block_start\n`);
res.write(`data: ${JSON.stringify({
type: "content_block_start",
index: i,
content_block: { type: "tool_result", tool_use_id: block.tool_use_id, content: "" }
})}\n\n`);

// Stream the actual content
const content = typeof block.content === 'string'
? block.content
: JSON.stringify(block.content);

logger.info({
blockIndex: i,
contentLength: content.length,
contentPreview: content.substring(0, 200)
}, "=== SSE: STREAMING TOOL_RESULT CONTENT ===");

res.write(`event: content_block_delta\n`);
res.write(`data: ${JSON.stringify({
type: "content_block_delta",
index: i,
delta: { type: "tool_result_delta", content: content }
})}\n\n`);

res.write(`event: content_block_stop\n`);
res.write(`data: ${JSON.stringify({ type: "content_block_stop", index: i })}\n\n`);

// === TOOL_RESULT SSE STREAMING - COMPLETED ===
logger.info({
blockIndex: i,
toolUseId: block.tool_use_id
}, "=== SSE: STREAMING TOOL_RESULT BLOCK - END ===");
}
}

// 3. message_delta with stop_reason
res.write(`event: message_delta\n`);
res.write(`data: ${JSON.stringify({
Expand Down Expand Up @@ -454,6 +519,16 @@ router.post("/v1/messages", rateLimiter, async (req, res, next) => {
});
}


// DIAGNOSTIC: Log response being sent to client
logger.info({
status: result.status,
hasBody: !!result.body,
bodyKeys: result.body ? Object.keys(result.body) : [],
bodyType: typeof result.body,
contentLength: result.body ? JSON.stringify(result.body).length : 0
}, "=== SENDING RESPONSE TO CLIENT ===");

metrics.recordResponse(result.status);
res.status(result.status).send(result.body);
} catch (error) {
Expand Down
69 changes: 69 additions & 0 deletions src/clients/ollama-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,65 @@ function convertAnthropicToolsToOllama(anthropicTools) {
}));
}

/**
* Extract tool call from text when LLM outputs JSON instead of using tool_calls
* Handles formats like: {"name": "Read", "parameters": {...}}
*
* @param {string} text - Text content that may contain JSON tool call
* @returns {object|null} - Tool call object in Ollama format, or null if not found
*/
function extractToolCallFromText(text) {
if (!text || typeof text !== 'string') return null;

// Find potential JSON start - look for {"name" pattern
const startMatch = text.match(/\{\s*"name"\s*:/);
if (!startMatch) return null;

const startIdx = startMatch.index;

// Find matching closing brace using brace counting
let braceCount = 0;
let endIdx = -1;
for (let i = startIdx; i < text.length; i++) {
if (text[i] === '{') braceCount++;
else if (text[i] === '}') {
braceCount--;
if (braceCount === 0) {
endIdx = i + 1;
break;
}
}
}

if (endIdx === -1) return null;

const jsonStr = text.substring(startIdx, endIdx);

try {
const parsed = JSON.parse(jsonStr);

if (!parsed.name || !parsed.parameters) {
return null;
}

logger.info({
toolName: parsed.name,
params: parsed.parameters,
originalText: text.substring(0, 200)
}, "Extracted tool call from text content (fallback parsing)");

return {
function: {
name: parsed.name,
arguments: parsed.parameters
}
};
} catch (e) {
logger.debug({ error: e.message, text: text.substring(0, 200) }, "Failed to parse extracted tool call");
return null;
}
}

/**
* Convert Ollama tool call response to Anthropic format
*
Expand Down Expand Up @@ -126,6 +185,15 @@ function convertOllamaToolCallsToAnthropic(ollamaResponse) {
const toolCalls = message.tool_calls || [];
const textContent = message.content || "";

// FALLBACK: If no tool_calls but text contains JSON tool call, parse it
if (toolCalls.length === 0 && textContent) {
const extracted = extractToolCallFromText(textContent);
if (extracted) {
logger.info({ extractedTool: extracted.function?.name }, "Using fallback text parsing for tool call");
toolCalls = [extracted];
}
}

const contentBlocks = [];

// Add text content if present
Expand Down Expand Up @@ -217,4 +285,5 @@ module.exports = {
convertOllamaToolCallsToAnthropic,
buildAnthropicResponseFromOllama,
modelNameSupportsTools,
extractToolCallFromText,
};
3 changes: 3 additions & 0 deletions src/config/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ if (!["server", "client", "passthrough"].includes(toolExecutionMode)) {
"TOOL_EXECUTION_MODE must be one of: server, client, passthrough (default: server)"
);
}
console.log(`[CONFIG] Tool execution mode: ${toolExecutionMode}`);

// Memory system configuration (Titans-inspired long-term memory)
const memoryEnabled = process.env.MEMORY_ENABLED !== "false"; // default true
Expand Down Expand Up @@ -342,6 +343,8 @@ const databricksUrl =
? `${rawBaseUrl}${endpointPath.startsWith("/") ? "" : "/"}${endpointPath}`
: null;

// Set MODEL_DEFAULT env var to use a specific model (e.g. "llama3.1" for Ollama).
// Without it, the default falls back to a Databricks Claude model regardless of MODEL_PROVIDER.
const defaultModel =
process.env.MODEL_DEFAULT ??
(modelProvider === "azure-anthropic" ? "claude-opus-4-5" : "databricks-claude-sonnet-4-5");
Expand Down
Loading
Loading