Skip to content

Commit 78e701d

Browse files
authored
feat(metrics): add user_stats table that tracks more granular usage information per user (#177)
* added user_stats table to track total number of workflow runs * added metrics for different types of triggers and runs * add total_tokens_used and total_cost metrics * ran migrations for new table, fixed build issue. added a user/stats route to fetch all user stats for the dashboard we'll eventually create * fix bug with api deployment status not appearing
1 parent a2ff120 commit 78e701d

File tree

17 files changed

+1625
-43
lines changed

17 files changed

+1625
-43
lines changed

sim/app/api/schedules/execute/route.ts

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
import { NextRequest, NextResponse } from 'next/server'
22
import { Cron } from 'croner'
33
import { eq, lte } from 'drizzle-orm'
4+
import { sql } from 'drizzle-orm'
45
import { v4 as uuidv4 } from 'uuid'
56
import { z } from 'zod'
67
import { createLogger } from '@/lib/logs/console-logger'
78
import { persistExecutionError, persistExecutionLogs } from '@/lib/logs/execution-logger'
89
import { buildTraceSpans } from '@/lib/logs/trace-spans'
910
import { decryptSecret } from '@/lib/utils'
11+
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
1012
import { mergeSubblockState } from '@/stores/workflows/utils'
1113
import { BlockState, WorkflowState } from '@/stores/workflows/workflow/types'
1214
import { db } from '@/db'
13-
import { environment, workflow, workflowSchedule } from '@/db/schema'
15+
import { environment, userStats, workflow, workflowSchedule } from '@/db/schema'
1416
import { Executor } from '@/executor'
1517
import { Serializer } from '@/serializer'
1618

@@ -326,6 +328,25 @@ export async function GET(req: NextRequest) {
326328
)
327329
const result = await executor.execute(schedule.workflowId)
328330

331+
logger.info(`[${requestId}] Workflow execution completed: ${schedule.workflowId}`, {
332+
success: result.success,
333+
executionTime: result.metadata?.duration,
334+
})
335+
336+
// Update workflow run counts if execution was successful
337+
if (result.success) {
338+
await updateWorkflowRunCounts(schedule.workflowId)
339+
340+
// Track scheduled execution in user stats
341+
await db
342+
.update(userStats)
343+
.set({
344+
totalScheduledExecutions: sql`total_scheduled_executions + 1`,
345+
lastActive: new Date(),
346+
})
347+
.where(eq(userStats.userId, workflowRecord.userId))
348+
}
349+
329350
// Build trace spans from execution logs
330351
const { traceSpans, totalDuration } = buildTraceSpans(result)
331352

@@ -371,10 +392,6 @@ export async function GET(req: NextRequest) {
371392
nextRunAt: nextRetryAt,
372393
})
373394
.where(eq(workflowSchedule.id, schedule.id))
374-
375-
logger.debug(
376-
`[${requestId}] Scheduled retry for workflow ${schedule.workflowId} at ${nextRetryAt.toISOString()}`
377-
)
378395
}
379396
} catch (error: any) {
380397
logger.error(

sim/app/api/schedules/route.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { createLogger } from '@/lib/logs/console-logger'
66
import { db } from '@/db'
77
import { workflowSchedule } from '@/db/schema'
88

9-
const logger = createLogger('Scheduled API')
9+
const logger = createLogger('ScheduledAPI')
1010

1111
/**
1212
* Get schedule information for a workflow

sim/app/api/user/stats/route.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import { NextRequest, NextResponse } from 'next/server'
2+
import { eq, sql } from 'drizzle-orm'
3+
import { getSession } from '@/lib/auth'
4+
import { createLogger } from '@/lib/logs/console-logger'
5+
import { db } from '@/db'
6+
import { userStats, workflow } from '@/db/schema'
7+
8+
const logger = createLogger('UserStatsAPI')
9+
10+
/**
11+
* GET endpoint to retrieve user statistics including the count of workflows
12+
*/
13+
export async function GET(request: NextRequest) {
14+
try {
15+
// Get the user session
16+
const session = await getSession()
17+
if (!session?.user?.id) {
18+
logger.warn('Unauthorized user stats access attempt')
19+
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
20+
}
21+
22+
const userId = session.user.id
23+
24+
// Get workflow count for user
25+
const [workflowCountResult] = await db
26+
.select({ count: sql`count(*)::int` })
27+
.from(workflow)
28+
.where(eq(workflow.userId, userId))
29+
30+
const workflowCount = workflowCountResult?.count || 0
31+
32+
// Get user stats record
33+
const userStatsRecords = await db.select().from(userStats).where(eq(userStats.userId, userId))
34+
35+
// If no stats record exists, create one
36+
if (userStatsRecords.length === 0) {
37+
const newStats = {
38+
id: crypto.randomUUID(),
39+
userId,
40+
totalManualExecutions: 0,
41+
totalApiCalls: 0,
42+
totalWebhookTriggers: 0,
43+
totalScheduledExecutions: 0,
44+
totalTokensUsed: 0,
45+
totalCost: '0.00',
46+
lastActive: new Date(),
47+
}
48+
49+
await db.insert(userStats).values(newStats)
50+
51+
// Return the newly created stats with workflow count
52+
return NextResponse.json({
53+
...newStats,
54+
workflowCount,
55+
})
56+
}
57+
58+
// Return stats with workflow count
59+
const stats = userStatsRecords[0]
60+
return NextResponse.json({
61+
...stats,
62+
workflowCount,
63+
})
64+
} catch (error) {
65+
logger.error('Error fetching user stats:', error)
66+
return NextResponse.json({ error: 'Failed to fetch user statistics' }, { status: 500 })
67+
}
68+
}

sim/app/api/webhooks/trigger/[path]/route.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import { NextRequest, NextResponse } from 'next/server'
2-
import { and, eq } from 'drizzle-orm'
2+
import { and, eq, sql } from 'drizzle-orm'
33
import { v4 as uuidv4 } from 'uuid'
44
import { createLogger } from '@/lib/logs/console-logger'
55
import { persistExecutionError, persistExecutionLogs } from '@/lib/logs/execution-logger'
66
import { buildTraceSpans } from '@/lib/logs/trace-spans'
77
import { closeRedisConnection, hasProcessedMessage, markMessageAsProcessed } from '@/lib/redis'
88
import { decryptSecret } from '@/lib/utils'
9+
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
910
import { mergeSubblockStateAsync } from '@/stores/workflows/utils'
1011
import { db } from '@/db'
11-
import { environment, webhook, workflow } from '@/db/schema'
12+
import { environment, userStats, webhook, workflow } from '@/db/schema'
1213
import { Executor } from '@/executor'
1314
import { Serializer } from '@/serializer'
1415

@@ -559,6 +560,20 @@ async function processWebhook(
559560
executionTime: result.metadata?.duration,
560561
})
561562

563+
// Update workflow run counts if execution was successful
564+
if (result.success) {
565+
await updateWorkflowRunCounts(foundWorkflow.id)
566+
567+
// Track webhook trigger in user stats
568+
await db
569+
.update(userStats)
570+
.set({
571+
totalWebhookTriggers: sql`total_webhook_triggers + 1`,
572+
lastActive: new Date(),
573+
})
574+
.where(eq(userStats.userId, foundWorkflow.userId))
575+
}
576+
562577
// Build trace spans from execution logs
563578
const { traceSpans, totalDuration } = buildTraceSpans(result)
564579

sim/app/api/workflows/[id]/deploy/route.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,59 @@ const logger = createLogger('WorkflowDeployAPI')
1212
export const dynamic = 'force-dynamic'
1313
export const runtime = 'nodejs'
1414

15+
export async function GET(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
16+
const requestId = crypto.randomUUID().slice(0, 8)
17+
const { id } = await params
18+
19+
try {
20+
logger.debug(`[${requestId}] Fetching deployment info for workflow: ${id}`)
21+
const validation = await validateWorkflowAccess(request, id, false)
22+
23+
if (validation.error) {
24+
logger.warn(`[${requestId}] Failed to fetch deployment info: ${validation.error.message}`)
25+
return createErrorResponse(validation.error.message, validation.error.status)
26+
}
27+
28+
// Fetch the workflow information including deployment details
29+
const result = await db
30+
.select({
31+
apiKey: workflow.apiKey,
32+
isDeployed: workflow.isDeployed,
33+
deployedAt: workflow.deployedAt,
34+
})
35+
.from(workflow)
36+
.where(eq(workflow.id, id))
37+
.limit(1)
38+
39+
if (result.length === 0) {
40+
logger.warn(`[${requestId}] Workflow not found: ${id}`)
41+
return createErrorResponse('Workflow not found', 404)
42+
}
43+
44+
const workflowData = result[0]
45+
46+
// If the workflow is not deployed, return appropriate response
47+
if (!workflowData.isDeployed || !workflowData.apiKey) {
48+
logger.info(`[${requestId}] Workflow is not deployed: ${id}`)
49+
return createSuccessResponse({
50+
isDeployed: false,
51+
apiKey: null,
52+
deployedAt: null,
53+
})
54+
}
55+
56+
logger.info(`[${requestId}] Successfully retrieved deployment info: ${id}`)
57+
return createSuccessResponse({
58+
apiKey: workflowData.apiKey,
59+
isDeployed: workflowData.isDeployed,
60+
deployedAt: workflowData.deployedAt,
61+
})
62+
} catch (error: any) {
63+
logger.error(`[${requestId}] Error fetching deployment info: ${id}`, error)
64+
return createErrorResponse(error.message || 'Failed to fetch deployment information', 500)
65+
}
66+
}
67+
1568
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
1669
const requestId = crypto.randomUUID().slice(0, 8)
1770
const { id } = await params

sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
import { NextRequest } from 'next/server'
2-
import { eq } from 'drizzle-orm'
2+
import { eq, sql } from 'drizzle-orm'
33
import { v4 as uuidv4 } from 'uuid'
44
import { z } from 'zod'
55
import { createLogger } from '@/lib/logs/console-logger'
66
import { persistExecutionError, persistExecutionLogs } from '@/lib/logs/execution-logger'
77
import { buildTraceSpans } from '@/lib/logs/trace-spans'
88
import { decryptSecret } from '@/lib/utils'
9+
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
910
import { mergeSubblockState } from '@/stores/workflows/utils'
1011
import { WorkflowState } from '@/stores/workflows/workflow/types'
1112
import { db } from '@/db'
12-
import { environment } from '@/db/schema'
13+
import { environment, userStats } from '@/db/schema'
1314
import { Executor } from '@/executor'
1415
import { Serializer } from '@/serializer'
1516
import { validateWorkflowAccess } from '../../middleware'
@@ -159,6 +160,20 @@ async function executeWorkflow(workflow: any, requestId: string, input?: any) {
159160
executionTime: result.metadata?.duration,
160161
})
161162

163+
// Update workflow run counts if execution was successful
164+
if (result.success) {
165+
await updateWorkflowRunCounts(workflowId)
166+
167+
// Track API call in user stats
168+
await db
169+
.update(userStats)
170+
.set({
171+
totalApiCalls: sql`total_api_calls + 1`,
172+
lastActive: new Date(),
173+
})
174+
.where(eq(userStats.userId, workflow.userId))
175+
}
176+
162177
// Build trace spans from execution logs
163178
const { traceSpans, totalDuration } = buildTraceSpans(result)
164179

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import { NextRequest, NextResponse } from 'next/server'
2+
import { eq, sql } from 'drizzle-orm'
3+
import { createLogger } from '@/lib/logs/console-logger'
4+
import { db } from '@/db'
5+
import { userStats, workflow } from '@/db/schema'
6+
7+
const logger = createLogger('WorkflowStatsAPI')
8+
9+
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
10+
const { id } = await params
11+
const searchParams = request.nextUrl.searchParams
12+
const runs = parseInt(searchParams.get('runs') || '1', 10)
13+
14+
if (isNaN(runs) || runs < 1 || runs > 100) {
15+
logger.error(`Invalid number of runs: ${runs}`)
16+
return NextResponse.json(
17+
{ error: 'Invalid number of runs. Must be between 1 and 100.' },
18+
{ status: 400 }
19+
)
20+
}
21+
22+
try {
23+
// Get workflow record
24+
const [workflowRecord] = await db.select().from(workflow).where(eq(workflow.id, id)).limit(1)
25+
26+
if (!workflowRecord) {
27+
return NextResponse.json({ error: `Workflow ${id} not found` }, { status: 404 })
28+
}
29+
30+
// Update workflow runCount
31+
try {
32+
await db
33+
.update(workflow)
34+
.set({
35+
runCount: workflowRecord.runCount + runs,
36+
lastRunAt: new Date(),
37+
})
38+
.where(eq(workflow.id, id))
39+
} catch (error) {
40+
logger.error('Error updating workflow runCount:', error)
41+
throw error
42+
}
43+
44+
// Upsert user stats record
45+
try {
46+
// Check if record exists
47+
const userStatsRecords = await db
48+
.select()
49+
.from(userStats)
50+
.where(eq(userStats.userId, workflowRecord.userId))
51+
52+
if (userStatsRecords.length === 0) {
53+
// Create new record if none exists
54+
await db.insert(userStats).values({
55+
id: crypto.randomUUID(),
56+
userId: workflowRecord.userId,
57+
totalManualExecutions: runs,
58+
totalApiCalls: 0,
59+
totalWebhookTriggers: 0,
60+
totalScheduledExecutions: 0,
61+
totalTokensUsed: 0,
62+
totalCost: '0.00',
63+
lastActive: new Date(),
64+
})
65+
} else {
66+
// Update existing record
67+
await db
68+
.update(userStats)
69+
.set({
70+
totalManualExecutions: sql`total_manual_executions + ${runs}`,
71+
lastActive: new Date(),
72+
})
73+
.where(eq(userStats.userId, workflowRecord.userId))
74+
}
75+
} catch (error) {
76+
logger.error(`Error upserting userStats for userId ${workflowRecord.userId}:`, error)
77+
// Don't rethrow - we want to continue even if this fails
78+
}
79+
80+
return NextResponse.json({
81+
success: true,
82+
runsAdded: runs,
83+
newTotal: workflowRecord.runCount + runs,
84+
})
85+
} catch (error) {
86+
logger.error('Error updating workflow stats:', error)
87+
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
88+
}
89+
}

sim/app/api/workflows/middleware.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { NextRequest } from 'next/server'
22
import { createLogger } from '@/lib/logs/console-logger'
3-
import { getWorkflowById } from '@/lib/workflows'
3+
import { getWorkflowById } from '@/lib/workflows/utils'
44

55
const logger = createLogger('WorkflowMiddleware')
66

0 commit comments

Comments
 (0)