Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions apps/sim/app/api/workflows/[id]/deployed/route.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { db, workflowDeploymentVersion } from '@sim/db'
import { createLogger } from '@sim/logger'
import { and, desc, eq } from 'drizzle-orm'
import type { NextRequest, NextResponse } from 'next/server'
import { verifyInternalToken } from '@/lib/auth/internal'
import { generateRequestId } from '@/lib/core/utils/request'
import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
import { validateWorkflowPermissions } from '@/lib/workflows/utils'
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'

Expand Down Expand Up @@ -43,21 +42,21 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
logger.debug(`[${requestId}] Internal API call for deployed workflow: ${id}`)
}

const [active] = await db
.select({ state: workflowDeploymentVersion.state })
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.orderBy(desc(workflowDeploymentVersion.createdAt))
.limit(1)
let deployedState = null
try {
const data = await loadDeployedWorkflowState(id)
deployedState = {
blocks: data.blocks,
edges: data.edges,
loops: data.loops,
parallels: data.parallels,
variables: data.variables,
}
} catch {
deployedState = null
}
Comment thread
icecrasher321 marked this conversation as resolved.

const response = createSuccessResponse({
deployedState: active?.state || null,
})
const response = createSuccessResponse({ deployedState })
return addNoCacheHeaders(response)
} catch (error: any) {
logger.error(`[${requestId}] Error fetching deployed state: ${id}`, error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import { useCredentialSets } from '@/hooks/queries/credential-sets'
import { useOAuthCredentials } from '@/hooks/queries/oauth-credentials'
import { useOrganizations } from '@/hooks/queries/organization'
import { useSubscriptionData } from '@/hooks/queries/subscription'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { useCredentialRefreshTriggers } from '@/hooks/use-credential-refresh-triggers'
import { getMissingRequiredScopes } from '@/hooks/use-oauth-scope-status'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
Expand Down Expand Up @@ -55,7 +54,6 @@ export function CredentialSelector({
const [isEditing, setIsEditing] = useState(false)
const { activeWorkflowId } = useWorkflowRegistry()
const [storeValue, setStoreValue] = useSubBlockValue<string | null>(blockId, subBlock.id)
const { collaborativeSetSubblockValue } = useCollaborativeWorkflow()

const requiredScopes = subBlock.requiredScopes || []
const label = subBlock.placeholder || 'Select credential'
Expand Down Expand Up @@ -136,11 +134,6 @@ export function CredentialSelector({
if (!response.ok || cancelled) return
const data = await response.json()
if (!cancelled && data.credential?.displayName) {
if (data.credential.id !== selectedId) {
collaborativeSetSubblockValue(blockId, subBlock.id, data.credential.id, {
skipDependsOn: true,
})
}
setInaccessibleCredentialName(data.credential.displayName)
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ export function ToolCredentialSelector({
if (!response.ok || cancelled) return
const data = await response.json()
if (!cancelled && data.credential?.displayName) {
if (data.credential.id !== selectedId) {
onChangeRef.current(data.credential.id)
}
setInaccessibleCredentialName(data.credential.displayName)
}
} catch {
Expand Down
9 changes: 1 addition & 8 deletions apps/sim/hooks/use-collaborative-workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1232,12 +1232,7 @@ export function useCollaborativeWorkflow() {
)

const collaborativeSetSubblockValue = useCallback(
(
blockId: string,
subblockId: string,
value: any,
options?: { _visited?: Set<string>; skipDependsOn?: boolean }
) => {
(blockId: string, subblockId: string, value: any, options?: { _visited?: Set<string> }) => {
if (isApplyingRemoteChange.current) return

if (isBaselineDiffView) {
Expand All @@ -1263,8 +1258,6 @@ export function useCollaborativeWorkflow() {
})
}

if (options?.skipDependsOn) return

// Handle dependent subblock clearing (recursive calls)
try {
const visited = options?._visited || new Set<string>()
Expand Down
159 changes: 146 additions & 13 deletions apps/sim/lib/workflows/persistence/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import {
workflowEdges,
workflowSubflows,
} from '@sim/db'
import { credential } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import type { InferInsertModel, InferSelectModel } from 'drizzle-orm'
import { and, desc, eq, sql } from 'drizzle-orm'
import { and, desc, eq, inArray, sql } from 'drizzle-orm'
import type { Edge } from 'reactflow'
import { v4 as uuidv4 } from 'uuid'
import type { DbOrTx } from '@/lib/db/types'
Expand Down Expand Up @@ -99,8 +100,19 @@ export async function loadDeployedWorkflowState(workflowId: string): Promise<Dep

const state = active.state as WorkflowState & { variables?: Record<string, unknown> }

const [wfRow] = await db
.select({ workspaceId: workflow.workspaceId })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)

const { blocks: migratedBlocks } = await migrateCredentialIds(
state.blocks || {},
wfRow?.workspaceId ?? undefined
)
Comment thread
icecrasher321 marked this conversation as resolved.
Outdated

return {
blocks: state.blocks || {},
blocks: migratedBlocks,
Comment thread
icecrasher321 marked this conversation as resolved.
edges: state.edges || [],
loops: state.loops || {},
parallels: state.parallels || {},
Expand Down Expand Up @@ -185,6 +197,103 @@ export function migrateAgentBlocksToMessagesFormat(
)
}

const CREDENTIAL_SUBBLOCK_IDS = new Set(['credential', 'triggerCredentials'])

/**
* Migrates legacy `account.id` values to `credential.id` in OAuth subblocks.
* Collects all potential legacy IDs in a single batch query for efficiency.
* Also migrates `tool.params.credential` in agent block tool arrays.
*/
async function migrateCredentialIds(
blocks: Record<string, BlockState>,
workspaceId?: string
): Promise<{ blocks: Record<string, BlockState>; migrated: boolean }> {
const potentialLegacyIds = new Set<string>()

for (const block of Object.values(blocks)) {
for (const [subBlockId, subBlock] of Object.entries(block.subBlocks || {})) {
const value = (subBlock as { value?: unknown }).value
if (
CREDENTIAL_SUBBLOCK_IDS.has(subBlockId) &&
typeof value === 'string' &&
value &&
!value.startsWith('cred_')
) {
potentialLegacyIds.add(value)
}

if (subBlockId === 'tools' && Array.isArray(value)) {
for (const tool of value) {
const credParam = tool?.params?.credential
if (typeof credParam === 'string' && credParam && !credParam.startsWith('cred_')) {
potentialLegacyIds.add(credParam)
}
}
}
}
}

if (potentialLegacyIds.size === 0) {
return { blocks, migrated: false }
}

const conditions = [inArray(credential.accountId, [...potentialLegacyIds])]
if (workspaceId) {
conditions.push(eq(credential.workspaceId, workspaceId))
}

const rows = await db
.select({ id: credential.id, accountId: credential.accountId })
.from(credential)
.where(and(...conditions))

if (rows.length === 0) {
return { blocks, migrated: false }
}

const accountToCredential = new Map(rows.map((r) => [r.accountId!, r.id]))

const migratedBlocks = Object.fromEntries(
Object.entries(blocks).map(([blockId, block]) => {
let blockChanged = false
const newSubBlocks = { ...block.subBlocks }

for (const [subBlockId, subBlock] of Object.entries(newSubBlocks)) {
if (CREDENTIAL_SUBBLOCK_IDS.has(subBlockId) && typeof subBlock.value === 'string') {
const newId = accountToCredential.get(subBlock.value)
if (newId) {
newSubBlocks[subBlockId] = { ...subBlock, value: newId }
blockChanged = true
}
}

if (subBlockId === 'tools' && Array.isArray(subBlock.value)) {
let toolsChanged = false
const newTools = (subBlock.value as any[]).map((tool: any) => {
const credParam = tool?.params?.credential
if (typeof credParam === 'string') {
const newId = accountToCredential.get(credParam)
if (newId) {
toolsChanged = true
return { ...tool, params: { ...tool.params, credential: newId } }
}
}
return tool
})
if (toolsChanged) {
newSubBlocks[subBlockId] = { ...subBlock, value: newTools as any }
blockChanged = true
}
}
}

return [blockId, blockChanged ? { ...block, subBlocks: newSubBlocks } : block]
})
)

return { blocks: migratedBlocks, migrated: true }
Comment thread
icecrasher321 marked this conversation as resolved.
Outdated
Comment thread
icecrasher321 marked this conversation as resolved.
Outdated
}

/**
* Load workflow state from normalized tables
* Returns null if no data found (fallback to JSON blob)
Expand All @@ -193,11 +302,15 @@ export async function loadWorkflowFromNormalizedTables(
workflowId: string
): Promise<NormalizedWorkflowData | null> {
try {
// Load all components in parallel
const [blocks, edges, subflows] = await Promise.all([
const [blocks, edges, subflows, [workflowRow]] = await Promise.all([
db.select().from(workflowBlocks).where(eq(workflowBlocks.workflowId, workflowId)),
db.select().from(workflowEdges).where(eq(workflowEdges.workflowId, workflowId)),
db.select().from(workflowSubflows).where(eq(workflowSubflows.workflowId, workflowId)),
db
.select({ workspaceId: workflow.workspaceId })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1),
Comment thread
icecrasher321 marked this conversation as resolved.
])

// If no blocks found, assume this workflow hasn't been migrated yet
Expand Down Expand Up @@ -236,9 +349,31 @@ export async function loadWorkflowFromNormalizedTables(
const { blocks: sanitizedBlocks } = sanitizeAgentToolsInBlocks(blocksMap)

// Migrate old agent block format (systemPrompt/userPrompt) to new messages array format
// This ensures backward compatibility for workflows created before the messages-input refactor
const migratedBlocks = migrateAgentBlocksToMessagesFormat(sanitizedBlocks)

// Migrate legacy account.id → credential.id in OAuth subblocks
const { blocks: credMigratedBlocks, migrated: credentialsMigrated } =
await migrateCredentialIds(migratedBlocks, workflowRow?.workspaceId ?? undefined)

if (credentialsMigrated) {
Promise.resolve().then(async () => {
try {
for (const [blockId, block] of Object.entries(credMigratedBlocks)) {
if (block.subBlocks !== migratedBlocks[blockId]?.subBlocks) {
await db
.update(workflowBlocks)
.set({ subBlocks: block.subBlocks, updatedAt: new Date() })
.where(
and(eq(workflowBlocks.id, blockId), eq(workflowBlocks.workflowId, workflowId))
)
}
}
} catch (err) {
logger.warn('Failed to persist credential ID migration', { workflowId, error: err })
}
})
Comment thread
icecrasher321 marked this conversation as resolved.
}

// Convert edges to the expected format
const edgesArray: Edge[] = edges.map((edge) => ({
id: edge.id,
Expand Down Expand Up @@ -275,15 +410,13 @@ export async function loadWorkflowFromNormalizedTables(
forEachItems: (config as Loop).forEachItems ?? '',
whileCondition: (config as Loop).whileCondition ?? '',
doWhileCondition: (config as Loop).doWhileCondition ?? '',
enabled: migratedBlocks[subflow.id]?.enabled ?? true,
enabled: credMigratedBlocks[subflow.id]?.enabled ?? true,
}
loops[subflow.id] = loop

// Sync block.data with loop config to ensure all fields are present
// This allows switching between loop types without losing data
if (migratedBlocks[subflow.id]) {
const block = migratedBlocks[subflow.id]
migratedBlocks[subflow.id] = {
if (credMigratedBlocks[subflow.id]) {
const block = credMigratedBlocks[subflow.id]
credMigratedBlocks[subflow.id] = {
...block,
data: {
...block.data,
Expand All @@ -304,7 +437,7 @@ export async function loadWorkflowFromNormalizedTables(
(config as Parallel).parallelType === 'collection'
? (config as Parallel).parallelType
: 'count',
enabled: migratedBlocks[subflow.id]?.enabled ?? true,
enabled: credMigratedBlocks[subflow.id]?.enabled ?? true,
}
parallels[subflow.id] = parallel
} else {
Expand All @@ -313,7 +446,7 @@ export async function loadWorkflowFromNormalizedTables(
})

return {
blocks: migratedBlocks,
blocks: credMigratedBlocks,
edges: edgesArray,
loops,
parallels,
Expand Down
Loading