diff --git a/packages/core/src/application/BatchService.ts b/packages/core/src/application/BatchService.ts index e86bcff..0292bd4 100644 --- a/packages/core/src/application/BatchService.ts +++ b/packages/core/src/application/BatchService.ts @@ -1,13 +1,13 @@ import type { BatchInputMode, + BatchPlan as BatchPlanData, ClarifyTicketId, PlanUnit, + PlanUnitId, ProposalId, SkillEvent, - SkillId, SkillRunId, Timestamp, - UserId, WorkspaceId, } from '@braidhq/schema' import type { BatchPlanRepository } from '../domain/batch/BatchPlanRepository.js' @@ -15,20 +15,22 @@ import type { Clock } from '../domain/Clock.js' import type { WorkspaceEvent } from '../domain/events/WorkspaceEvent.js' import type { ClarifyTicketRepository } from '../domain/hitl/ClarifyTicketRepository.js' import type { ProposalRepository } from '../domain/hitl/ProposalRepository.js' +import type { OntologyBatchBinding, OntologyPlugin } from '../domain/plugin/Ontology.js' +import type { PluginRegistry } from '../domain/plugin/PluginRegistry.js' import type { SkillRunner } from '../domain/skill/SkillRunner.js' import type { Workspace } from '../domain/workspace/Workspace.js' import type { HistoryService } from './HistoryService.js' import type { HITLService } from './HITLService.js' import type { PerWorkspaceLock } from './PerWorkspaceLock.js' +import type { SourceUnitStateService } from './SourceUnitStateService.js' import type { WorkspaceEventBus } from './WorkspaceEventBus.js' import type { WorkspaceService } from './WorkspaceService.js' +import { UserId } from '@braidhq/schema' import { BatchPlan } from '../domain/batch/BatchPlan.js' import { ConflictError, ValidationError } from '../domain/errors.js' import { newBatchPlanId, newPlanUnitId } from '../domain/ids.js' -const BATCH_USER_ID = 'braid-batch' as UserId -const SCAN_SKILL_ID = 'braid-scan' as SkillId -const EXTRACT_SKILL_ID = 'braid-extract' as SkillId +const BATCH_USER_ID = UserId.parse('braid-batch') export interface IntentItem { readonly value: string @@ -49,13 +51,36 @@ export interface BatchServiceDeps { batchPlanRepository: BatchPlanRepository // Filesystem walk happens in infrastructure; the orchestrator just consumes the list. intentLister: IntentLister + /** + * Resolves the workspace's ontology plugin so the service can read its + * batch binding (per-unit skill, checkpoint config, derive skill). + * Without this binding the framework has no opinion on which skills + * to dispatch. + */ + pluginRegistry: PluginRegistry eventBus?: WorkspaceEventBus workspaceLock: PerWorkspaceLock clock: Clock + /** + * Optional. When supplied, batch records a `SourceUnitState` + * observation after every successful unit extract so Reactor / + * manual paths share the same diff primitive. Absent in pure + * unit-test wiring; production composition (`composeFsApp`) always + * provides it. + */ + sourceUnitStateService?: SourceUnitStateService } export interface StartBatchOptions { autoApply: boolean + /** + * Bearer token captured from the caller's session, forwarded to every + * skill subprocess this batch spawns so they can call back into the + * server API via the braid-core MCP gateway. Absent when running in + * `BRAID_LOCAL_TRUST=true` mode (anonymous local dev) where the auth + * middleware lets unauthenticated callers through. + */ + callerToken?: string } export class BatchService { @@ -67,7 +92,9 @@ export class BatchService { return this.deps.workspaceLock.run(workspaceId, async () => { const workspace = await this.deps.workspaceService.findById(workspaceId) await this.assertNoActiveBatch(workspace) - const mode = this.resolveMode(workspace) + const ontology = this.resolveOntology(workspace) + const binding = this.requireBinding(ontology) + const mode = this.resolveMode(workspace, binding) const now = this.deps.clock.now() const baselineTag = `batch-baseline-${tagSuffix(now)}` const initialUnits = mode === 'intent' ? await this.buildIntentUnits(workspace) : [] @@ -86,6 +113,8 @@ export class BatchService { status: 'idle', autoApply: options.autoApply, units: initialUnits, + checkpointPhases: [], + batchPolicy: snapshotBatchPolicy(binding), }).beginRun(now, baselineTag) await this.tagBaseline(workspace, baselineTag, now) @@ -93,7 +122,7 @@ export class BatchService { this.publish(workspaceId, { type: 'batch.started', workspaceId, planId: plan.id, mode, at: now }) // Fire-and-forget; callers poll via getStatus or subscribe to SSE. - void this.runLoop(workspace, plan).catch(async (err: unknown) => { + void this.runLoop(workspace, plan, options.callerToken).catch(async (err: unknown) => { const failed = (await this.deps.batchPlanRepository.load(workspace)) ?.markFailed(this.deps.clock.now(), errorMessage(err)) if (failed) @@ -117,7 +146,7 @@ export class BatchService { const plan = await this.deps.batchPlanRepository.load(workspace) if (!plan) return - if (plan.status !== 'running' && plan.status !== 'scanning') + if (plan.status !== 'running' && plan.status !== 'deriving') return if (plan.running && this.deps.skillRunner.isActive(plan.running.skillRunId)) return @@ -125,7 +154,7 @@ export class BatchService { await this.deps.batchPlanRepository.save(workspace, failed) } - async resume(workspaceId: WorkspaceId): Promise { + async resume(workspaceId: WorkspaceId, options: { callerToken?: string } = {}): Promise { return this.deps.workspaceLock.run(workspaceId, async () => { const workspace = await this.deps.workspaceService.findById(workspaceId) const existing = await this.deps.batchPlanRepository.load(workspace) @@ -142,7 +171,7 @@ export class BatchService { mode: resumed.mode, at: now, }) - void this.runLoop(workspace, resumed).catch(async (err: unknown) => { + void this.runLoop(workspace, resumed, options.callerToken).catch(async (err: unknown) => { const failed = (await this.deps.batchPlanRepository.load(workspace)) ?.markFailed(this.deps.clock.now(), errorMessage(err)) if (failed) @@ -188,7 +217,8 @@ export class BatchService { // Move a terminal plan to `archived`. The Studio Batch page treats // archived the same as "no active plan" but keeps the report - // browsable via the PreStart "previous batch" slot. + // browsable via the PreStart "previous batch" slot. Recorded in git + // history so the audit trail stays continuous (`batch-archive` kind). async archive(workspaceId: WorkspaceId): Promise { return this.deps.workspaceLock.run(workspaceId, async () => { const workspace = await this.deps.workspaceService.findById(workspaceId) @@ -197,14 +227,20 @@ export class BatchService { throw new ValidationError(`No batch plan to archive on workspace "${workspaceId}"`) const archived = existing.archive(this.deps.clock.now()) await this.deps.batchPlanRepository.save(workspace, archived) + await this.deps.historyService.commitWorkspaceChange(workspace.id, { + kind: 'batch-archive', + subject: `archived ${archived.id}`, + userId: BATCH_USER_ID, + }) return archived }) } - private async runLoop(workspace: Workspace, initial: BatchPlan): Promise { + private async runLoop(workspace: Workspace, initial: BatchPlan, callerToken?: string): Promise { let plan = initial - if (plan.mode === 'scan') { - plan = await this.runScanPhase(workspace, plan) + const binding = this.requireBinding(this.resolveOntology(workspace)) + if (plan.mode === 'derive') { + plan = await this.runDerivePhase(workspace, plan, binding, callerToken) if (plan.status !== 'running') return } @@ -223,27 +259,157 @@ export class BatchService { }) return } - plan = await this.runUnit(workspace, plan, unit) + plan = await this.runUnit(workspace, plan, binding, unit, callerToken) + // Fire a checkpoint whenever the ontology's chunkSize threshold is + // crossed. A failed checkpoint fails the batch immediately so we + // don't keep dispatching unit runs into a model state we know is + // already broken. + if (binding.checkpoint) { + const unconsumed = unconsumedCompletedUnitIds(plan) + if (unconsumed.length >= binding.checkpoint.chunkSize) { + const chunkUnitIds = unconsumed.slice(0, binding.checkpoint.chunkSize) + const after = await this.runCheckpointPhase(workspace, plan, binding.checkpoint, chunkUnitIds, callerToken) + if (after.status === 'failed') + return + plan = after + } + } + } + // Optional end-of-loop checkpoint. The ontology decides whether + // to require it (`runAtEnd: true`); some ontologies might rely + // purely on per-chunk checkpoints, others might skip checkpoints + // entirely. + if (binding.checkpoint?.runAtEnd) { + const remainingUnits = unconsumedCompletedUnitIds(plan) + const after = await this.runCheckpointPhase(workspace, plan, binding.checkpoint, remainingUnits, callerToken) + if (after.status === 'failed') + return + plan = after } const now = this.deps.clock.now() - plan = plan.markCompleted(now) - await this.deps.batchPlanRepository.save(workspace, plan) + const completedPlan = plan.markCompleted(now) + await this.deps.batchPlanRepository.save(workspace, completedPlan) this.publish(workspace.id, { type: 'batch.completed', workspaceId: workspace.id, - planId: plan.id, + planId: completedPlan.id, at: now, }) } - private async runScanPhase(workspace: Workspace, _plan: BatchPlan): Promise { - const runId = await this.deps.skillRunner.start(workspace, SCAN_SKILL_ID, '') + private async runCheckpointPhase( + workspace: Workspace, + plan: BatchPlan, + checkpoint: NonNullable, + unitIds: readonly PlanUnitId[], + callerToken?: string, + ): Promise { + const unitsById = new Map(plan.units.map(u => [u.id, u] as const)) + const units = unitIds.map(id => unitsById.get(id)).filter((u): u is PlanUnit => !!u) + const extraEnv = checkpoint.extraEnv?.(units) + const hasEnv = !!extraEnv && Object.keys(extraEnv).length > 0 + let runId: SkillRunId | undefined + try { + const startedAt = this.deps.clock.now() + // Snapshot proposal ids so the post-run sweep can attribute new + // ones to this checkpoint (mirrors runUnit's set-difference). + const before = await this.snapshotIds(workspace.id) + runId = await this.deps.skillRunner.start( + workspace, + checkpoint.skillId, + '', + { + ...(hasEnv ? { extraEnv } : {}), + ...(callerToken ? { callerToken } : {}), + }, + ) + const running = plan.startCheckpointPhase(startedAt, runId, unitIds) + await this.deps.batchPlanRepository.save(workspace, running) + this.publish(workspace.id, { + type: 'batch.checkpoint.started', + workspaceId: workspace.id, + planId: running.id, + skillRunId: runId, + at: startedAt, + }) + await this.runSkillWithAutoApply(workspace, runId, plan.autoApply, before) + const completedAt = this.deps.clock.now() + const completed = running.completeCheckpointPhase(completedAt) + await this.deps.batchPlanRepository.save(workspace, completed) + this.publish(workspace.id, { + type: 'batch.checkpoint.completed', + workspaceId: workspace.id, + planId: completed.id, + skillRunId: runId, + at: completedAt, + }) + return completed + } + catch (err) { + const now = this.deps.clock.now() + const errorText = `checkpoint "${checkpoint.skillId}" failed: ${errorMessage(err)}` + const phaseFailed = (runId ? plan.startCheckpointPhase(now, runId, unitIds) : plan).failCheckpointPhase(now, errorText) + const failed = phaseFailed.markFailed(now, errorText) + await this.deps.batchPlanRepository.save(workspace, failed) + if (runId) { + this.publish(workspace.id, { + type: 'batch.checkpoint.failed', + workspaceId: workspace.id, + planId: failed.id, + skillRunId: runId, + error: errorText, + at: now, + }) + } + this.publish(workspace.id, { + type: 'batch.failed', + workspaceId: workspace.id, + planId: failed.id, + error: errorText, + at: now, + }) + return failed + } + } + + private async recordObservation( + workspace: Workspace, + unit: PlanUnit, + runId: SkillRunId, + ): Promise { + const service = this.deps.sourceUnitStateService + if (!service || !unit.sourceId || !unit.scopeHint) + return + try { + await service.recordObservation(workspace.id, unit.sourceId, unit.scopeHint, runId) + } + catch { + // Observation recording is best-effort. The extract itself + // already succeeded and failing to record it must not fail the + // batch; Reactor will see the unit as "changed" next cycle and + // re-extract, which is a recoverable state. + } + } + + private async runDerivePhase( + workspace: Workspace, + _plan: BatchPlan, + binding: OntologyBatchBinding, + callerToken?: string, + ): Promise { + if (!binding.deriveUnits) { + throw new ValidationError( + `Workspace "${workspace.id}" has no intent source and the ontology "${this.resolveOntology(workspace).ontologyId}" provides no deriveUnits skill.`, + ) + } + const skillId = binding.deriveUnits.skillId + const runId = await this.deps.skillRunner.start(workspace, skillId, '', callerToken ? { callerToken } : undefined) await waitForCompletion(this.deps.skillRunner, runId) const updated = await this.deps.batchPlanRepository.load(workspace) if (!updated) - throw new Error('batch-plan.json disappeared during scan') + throw new Error('batch-plan.json disappeared during derive') if (updated.units.length === 0) { - const failed = updated.markFailed(this.deps.clock.now(), 'braid-scan produced no units') + const failed = updated.markFailed(this.deps.clock.now(), `derive skill "${skillId}" produced no units`) await this.deps.batchPlanRepository.save(workspace, failed) return failed } @@ -252,13 +418,19 @@ export class BatchService { return promoted } - private async runUnit(workspace: Workspace, plan: BatchPlan, unit: PlanUnit): Promise { + private async runUnit(workspace: Workspace, plan: BatchPlan, binding: OntologyBatchBinding, unit: PlanUnit, callerToken?: string): Promise { const startedAt = this.deps.clock.now() let running = plan try { // Snapshot the pre-run id sets so post-run additions can be attributed to this unit. const before = await this.snapshotIds(workspace.id) - const runId = await this.deps.skillRunner.start(workspace, EXTRACT_SKILL_ID, unitArg(unit)) + const argsFor = binding.perUnit.argsFor ?? defaultUnitArg + const runId = await this.deps.skillRunner.start( + workspace, + binding.perUnit.skillId, + argsFor(unit), + callerToken ? { callerToken } : undefined, + ) running = plan.startUnit(startedAt, unit.id, { unitId: unit.id, skillRunId: runId }) await this.deps.batchPlanRepository.save(workspace, running) this.publish(workspace.id, { @@ -270,29 +442,12 @@ export class BatchService { at: startedAt, }) - // Apply mid-run instead of post-unit: extract validators can require prior proposals to be applied, which deadlocks a sweep. - const alreadyApplied = new Set() - const unsubscribe = plan.autoApply - ? this.streamApplyProposals(workspace.id, alreadyApplied) - : () => {} - - try { - await waitForCompletion(this.deps.skillRunner, runId) - } - finally { - unsubscribe() - } - - const output = await this.collectUnitOutput(workspace.id, before) - if (plan.autoApply) { - // Belt + braces: sweep anything the stream listener missed (publish race / unsubscribe ordering). - const remaining = output.proposalIds.filter(id => !alreadyApplied.has(id)) - await this.autoApply(remaining) - } + const output = await this.runSkillWithAutoApply(workspace, runId, plan.autoApply, before) const completedAt = this.deps.clock.now() const completed = running.completeUnit(completedAt, unit.id, output) await this.deps.batchPlanRepository.save(workspace, completed) + await this.recordObservation(workspace, unit, runId) this.publish(workspace.id, { type: 'batch.unit.completed', workspaceId: workspace.id, @@ -346,6 +501,35 @@ export class BatchService { } } + // Drives a single skill run end to end with autoApply wiring: streams + // proposals as they're created, waits for the subprocess to exit, then + // sweeps anything the stream missed. Shared by runUnit and + // runCheckpointPhase so both kinds of skill dispatch get the same + // autoApply treatment. + private async runSkillWithAutoApply( + workspace: Workspace, + runId: SkillRunId, + autoApply: boolean, + before: { proposals: Set, clarify: Set }, + ): Promise<{ proposalIds: ProposalId[], clarifyTicketIds: ClarifyTicketId[] }> { + const applied = new Set() + const unsubscribe = autoApply + ? this.streamApplyProposals(workspace.id, applied) + : () => {} + try { + await waitForCompletion(this.deps.skillRunner, runId) + } + finally { + unsubscribe() + } + const output = await this.collectUnitOutput(workspace.id, before) + if (autoApply) { + const remaining = output.proposalIds.filter(id => !applied.has(id)) + await this.autoApply(remaining) + } + return output + } + private streamApplyProposals(workspaceId: WorkspaceId, applied: Set): () => void { if (!this.deps.eventBus) return () => {} @@ -376,23 +560,42 @@ export class BatchService { private async assertNoActiveBatch(workspace: Workspace): Promise { const existing = await this.deps.batchPlanRepository.load(workspace) - if (existing && (existing.status === 'running' || existing.status === 'scanning')) { + if (existing && (existing.status === 'running' || existing.status === 'deriving')) { throw new ConflictError( `Workspace "${workspace.id}" already has an active batch (plan ${existing.id} status=${existing.status})`, ) } } - private resolveMode(workspace: Workspace): BatchInputMode { + private resolveMode(workspace: Workspace, binding: OntologyBatchBinding): BatchInputMode { if (workspace.intentSources().length > 0) return 'intent' - if (workspace.codeSources().length > 0) - return 'scan' + if (workspace.codeSources().length > 0) { + if (!binding.deriveUnits) { + throw new ValidationError( + `Workspace "${workspace.id}" has no intent source; ontology "${workspace.productManifest.ontologyId}" must provide a deriveUnits skill to batch from code.`, + ) + } + return 'derive' + } throw new ValidationError( `Workspace "${workspace.id}" has no intent or code sources to bootstrap from`, ) } + private resolveOntology(workspace: Workspace): OntologyPlugin { + return this.deps.pluginRegistry.requireOntology(workspace.productManifest.ontologyId) + } + + private requireBinding(ontology: OntologyPlugin): OntologyBatchBinding { + if (!ontology.batch) { + throw new ValidationError( + `Ontology "${ontology.ontologyId}" does not declare a batch binding; it cannot participate in batches.`, + ) + } + return ontology.batch + } + private async buildIntentUnits(workspace: Workspace): Promise { const items = await this.deps.intentLister(workspace) return items.map(item => ({ @@ -427,7 +630,7 @@ export class BatchService { } } -function unitArg(unit: PlanUnit): string { +function defaultUnitArg(unit: PlanUnit): string { return unit.scopeHint ?? unit.name } @@ -439,6 +642,38 @@ function errorMessage(err: unknown): string { return err instanceof Error ? err.message : String(err) } +/** + * Returns the ordered list of unit ids that are `completed` on the + * plan but have not yet been recorded as `unitIds` in any successful + * model phase. Order matches `plan.units` so the chunking inside the + * run loop processes them in extraction order. + */ +function snapshotBatchPolicy(binding: OntologyBatchBinding): NonNullable { + return { + perUnitSkillId: binding.perUnit.skillId, + ...(binding.perUnit.label ? { perUnitLabel: binding.perUnit.label } : {}), + ...(binding.checkpoint + ? { + checkpointSkillId: binding.checkpoint.skillId, + ...(binding.checkpoint.label ? { checkpointLabel: binding.checkpoint.label } : {}), + checkpointChunkSize: binding.checkpoint.chunkSize, + checkpointRunAtEnd: binding.checkpoint.runAtEnd, + } + : {}), + } +} + +function unconsumedCompletedUnitIds(plan: BatchPlan): readonly PlanUnitId[] { + const consumed = new Set() + for (const phase of plan.checkpointPhases) { + if (phase.status === 'completed') { + for (const id of phase.unitIds) + consumed.add(id) + } + } + return plan.units.filter(u => u.status === 'completed' && !consumed.has(u.id)).map(u => u.id) +} + async function waitForCompletion(runner: SkillRunner, runId: SkillRunId): Promise { await new Promise((resolve, reject) => { const sub = runner.subscribe(runId, (event: SkillEvent) => { diff --git a/packages/core/src/application/HITLService.ts b/packages/core/src/application/HITLService.ts index 6d76c89..56796a1 100644 --- a/packages/core/src/application/HITLService.ts +++ b/packages/core/src/application/HITLService.ts @@ -10,7 +10,6 @@ import type { GraphOperation, ProposalDraft, ProposalId, - UserId, ValidationIssue, WorkspaceId, } from '@braidhq/schema' @@ -26,15 +25,17 @@ import type { Workspace } from '../domain/workspace/Workspace.js' import type { ValidationService } from './ValidationService.js' import type { WorkspaceEventBus } from './WorkspaceEventBus.js' import type { WorkspaceService } from './WorkspaceService.js' +import { UserId } from '@braidhq/schema' import { ValidationError } from '../domain/errors.js' import { ClarifyTicket } from '../domain/hitl/ClarifyTicket.js' import { Proposal } from '../domain/hitl/Proposal.js' import { newClarifyCandidateId, newClarifyTicketId, newDecisionId, newProposalId } from '../domain/ids.js' import { noopUserDirectory } from '../domain/users/UserDirectory.js' +import { enrichCommitAuthor } from './enrichCommitAuthor.js' import { PerWorkspaceLock } from './PerWorkspaceLock.js' // Generic system author for submit commits until Theme 13 (account management) supplies real per-user attribution. -const SUBMIT_USER_ID = 'braid-skill' as UserId +const SUBMIT_USER_ID = UserId.parse('braid-skill') export interface HITLServiceDeps { proposalRepository: ProposalRepository @@ -365,7 +366,7 @@ export class HITLService { const snapshot = await this.deps.modelRepository.load(workspace.id) await this.deps.graphSerializer.write(workspace, snapshot) } - const enriched = await this.enrichAuthor(message) + const enriched = await enrichCommitAuthor(message, this.userDirectory) const sha = await this.deps.history.commit(workspace, enriched) this.deps.eventBus?.publish({ type: 'history.committed', @@ -374,23 +375,4 @@ export class HITLService { at: this.deps.clock.now(), }) } - - /** - * Snapshot the user's displayName + email into the commit so git - * stores the real Google identity. Skips when `userDirectory` - * doesn't know the userId — bootstrap / skill-system authors lack - * a row and fall through to the placeholder synth in the git layer. - */ - private async enrichAuthor(message: CommitMessage): Promise { - if (message.authorName !== undefined || message.authorEmail !== undefined) - return message - const author = await this.userDirectory.resolve(message.userId) - if (!author) - return message - return { - ...message, - authorName: author.displayName, - ...(author.email ? { authorEmail: author.email } : {}), - } - } } diff --git a/packages/core/src/application/HistoryService.ts b/packages/core/src/application/HistoryService.ts index fae7fdf..ac66673 100644 --- a/packages/core/src/application/HistoryService.ts +++ b/packages/core/src/application/HistoryService.ts @@ -20,6 +20,7 @@ import type { WorkspaceService } from './WorkspaceService.js' import { diffSnapshots } from '@braidhq/schema' import { ConflictError } from '../domain/errors.js' import { noopUserDirectory } from '../domain/users/UserDirectory.js' +import { enrichCommitAuthor } from './enrichCommitAuthor.js' export interface HistoryServiceDeps { history: WorkspaceHistory @@ -111,6 +112,26 @@ export class HistoryService { return this.deps.history.tag(workspace, sha, name, note) } + /** + * Commit whatever artifact changes a caller has just persisted to + * disk, with the supplied commit message. The caller is responsible + * for holding the per-workspace lock (so the commit isn't racing a + * concurrent restore) and for having already written its file + * changes. `userId` is snapshotted into the git author line. + */ + async commitWorkspaceChange(workspaceId: WorkspaceId, message: CommitMessage): Promise { + const workspace = await this.deps.workspaceService.findById(workspaceId) + const enriched = await enrichCommitAuthor(message, this.userDirectory) + const sha = await this.deps.history.commit(workspace, enriched) + this.deps.eventBus?.publish({ + type: 'history.committed', + workspaceId: workspace.id, + sha, + at: this.deps.clock.now(), + }) + return sha + } + async listTags(workspaceId: WorkspaceId): Promise { const workspace = await this.deps.workspaceService.findById(workspaceId) return this.deps.history.listTags(workspace) diff --git a/packages/core/src/application/SourceUnitStateService.ts b/packages/core/src/application/SourceUnitStateService.ts new file mode 100644 index 0000000..3d82f96 --- /dev/null +++ b/packages/core/src/application/SourceUnitStateService.ts @@ -0,0 +1,67 @@ +import type { SkillRunId, SourceId, SourceUnitDiff, SourceUnitState, WorkspaceId } from '@braidhq/schema' +import type { Clock } from '../domain/Clock.js' +import type { SourceUnitDigest } from '../domain/source/SourceUnitDigest.js' +import type { SourceUnitStateRepository } from '../domain/source/SourceUnitStateRepository.js' +import type { WorkspaceService } from './WorkspaceService.js' +import { computeSourceUnitDiff } from '../domain/source/computeSourceUnitDiff.js' + +export interface SourceUnitStateServiceDeps { + repository: SourceUnitStateRepository + digest: SourceUnitDigest + workspaceService: WorkspaceService + clock: Clock +} + +/** + * Application-level entry point for recording and querying source unit + * observations. Every orchestrator that runs a skill against a source + * unit (BatchService today, ReactorService and manual-extract dispatch + * later) goes through `recordObservation` so the audit and diff + * primitives stay in one place. + * + * Reads (`listByWorkspace`, `diffAgainst`) are delegated to the + * repository / pure diff function. The service exists mostly to (a) + * fan-in writes with sha computation, and (b) give callers a stable + * surface even if persistence later moves to a database. + */ +export class SourceUnitStateService { + constructor(private readonly deps: SourceUnitStateServiceDeps) {} + + async recordObservation( + workspaceId: WorkspaceId, + sourceId: SourceId, + path: string, + runId?: SkillRunId, + ): Promise { + const workspace = await this.deps.workspaceService.findById(workspaceId) + const sha = await this.deps.digest.computeSha(workspace, sourceId, path) + const state: SourceUnitState = { + workspaceId, + sourceId, + path, + lastObservedSha: sha, + lastObservedAt: this.deps.clock.now(), + ...(runId ? { lastObservedByRunId: runId } : {}), + } + await this.deps.repository.save(state) + return state + } + + async listByWorkspace(workspaceId: WorkspaceId): Promise { + return this.deps.repository.listByWorkspace(workspaceId) + } + + async listBySource(workspaceId: WorkspaceId, sourceId: SourceId): Promise { + return this.deps.repository.listBySource(workspaceId, sourceId) + } + + /** + * Compute the partition of `units` against what's currently recorded + * for this workspace. Used by Reactor to decide which units to + * re-extract; useful in tests for asserting batch progress. + */ + async diffAgainst(workspaceId: WorkspaceId, units: ReadonlyArray<{ sourceId: SourceId, path: string, sha: SourceUnitState['lastObservedSha'] }>): Promise { + const states = await this.deps.repository.listByWorkspace(workspaceId) + return computeSourceUnitDiff(states, units) + } +} diff --git a/packages/core/src/application/enrichCommitAuthor.ts b/packages/core/src/application/enrichCommitAuthor.ts new file mode 100644 index 0000000..be456be --- /dev/null +++ b/packages/core/src/application/enrichCommitAuthor.ts @@ -0,0 +1,24 @@ +import type { CommitMessage } from '@braidhq/schema' +import type { UserDirectory } from '../domain/users/UserDirectory.js' + +/** + * Resolve the human author for a commit and snapshot `displayName` / + * `email` into the message so a later rename of the user record can't + * retroactively rewrite git history. Pass-through when the caller has + * already populated those fields, or when the directory has no record. + */ +export async function enrichCommitAuthor( + message: CommitMessage, + userDirectory: UserDirectory, +): Promise { + if (message.authorName !== undefined || message.authorEmail !== undefined) + return message + const author = await userDirectory.resolve(message.userId) + if (!author) + return message + return { + ...message, + authorName: author.displayName, + ...(author.email ? { authorEmail: author.email } : {}), + } +} diff --git a/packages/core/src/domain/batch/BatchPlan.ts b/packages/core/src/domain/batch/BatchPlan.ts index f018d85..8ba420d 100644 --- a/packages/core/src/domain/batch/BatchPlan.ts +++ b/packages/core/src/domain/batch/BatchPlan.ts @@ -1,4 +1,5 @@ import type { + BatchCheckpointPhase, BatchInputMode, BatchPlan as BatchPlanData, BatchPlanId, @@ -8,6 +9,7 @@ import type { PlanUnit, PlanUnitId, ProposalId, + SkillRunId, Timestamp, WorkspaceId, } from '@braidhq/schema' @@ -30,24 +32,25 @@ export class BatchPlan { get units(): readonly PlanUnit[] { return this.data.units } get running(): BatchRunning | undefined { return this.data.running } get error(): string | undefined { return this.data.error } + get checkpointPhases(): readonly BatchCheckpointPhase[] { return this.data.checkpointPhases } get createdAt(): Timestamp { return this.data.createdAt } get updatedAt(): Timestamp { return this.data.updatedAt } - // mode='scan' goes idle → scanning → running. mode='intent' goes idle → running. + // mode='derive' goes idle → deriving → running. mode='intent' goes idle → running. beginRun(now: Timestamp, baselineTag: string): BatchPlan { if (this.data.status !== 'idle') throw new ConflictError(`Batch plan ${this.data.id} is already ${this.data.status}`) return this.with({ - status: this.data.mode === 'scan' ? 'scanning' : 'running', + status: this.data.mode === 'derive' ? 'deriving' : 'running', baselineTag, updatedAt: now, }) } - // Called when braid-scan finished writing units back into the plan. + // Called when the ontology's `deriveUnits` skill finishes writing units back into the plan. promoteToRunning(now: Timestamp, units: readonly PlanUnit[]): BatchPlan { - if (this.data.status !== 'scanning') - throw new ConflictError(`Batch plan ${this.data.id} is not scanning (status=${this.data.status})`) + if (this.data.status !== 'deriving') + throw new ConflictError(`Batch plan ${this.data.id} is not deriving (status=${this.data.status})`) return this.with({ status: 'running', units: [...units], updatedAt: now }) } @@ -104,6 +107,41 @@ export class BatchPlan { return this.with({ status: 'stopped', running: undefined, updatedAt: now }) } + startCheckpointPhase(now: Timestamp, skillRunId: SkillRunId, unitIds: readonly PlanUnitId[]): BatchPlan { + const newPhase: BatchCheckpointPhase = { + status: 'running', + unitIds: [...unitIds], + startedAt: now, + skillRunId, + } + return this.with({ + checkpointPhases: [...this.data.checkpointPhases, newPhase], + updatedAt: now, + }) + } + + completeCheckpointPhase(now: Timestamp): BatchPlan { + return this.with({ + checkpointPhases: this.mapLastCheckpointPhase(phase => ({ ...phase, status: 'completed', completedAt: now })), + updatedAt: now, + }) + } + + failCheckpointPhase(now: Timestamp, error: string): BatchPlan { + return this.with({ + checkpointPhases: this.mapLastCheckpointPhase(phase => ({ ...phase, status: 'failed', completedAt: now, error })), + updatedAt: now, + }) + } + + private mapLastCheckpointPhase(fn: (phase: BatchCheckpointPhase) => BatchCheckpointPhase): BatchCheckpointPhase[] { + const arr = [...this.data.checkpointPhases] + if (arr.length === 0) + return arr + arr[arr.length - 1] = fn(arr[arr.length - 1]!) + return arr + } + // User-driven dismiss after reviewing the report. Allowed only from a // terminal state so an in-flight batch can't be hidden by accident. // Archived plans stay on disk; the UI treats them like "no active plan" @@ -119,14 +157,20 @@ export class BatchPlan { if (!this.isTerminal()) throw new ConflictError(`Cannot resume plan ${this.data.id} from status=${this.data.status}`) const units = this.data.units.map(unit => unit.status === 'failed' || unit.status === 'pending' ? resetUnit(unit) : unit) - const patch: Partial = { + // Drop failed checkpoint phases so the upcoming chunk accounting + // resets; successful phases stay because the units they consumed + // are still recorded as completed. + const checkpointPhases = this.data.checkpointPhases.filter(p => p.status === 'completed') + const next: BatchPlanData = { + ...this.data, status: 'running', - running: undefined, units, + checkpointPhases, updatedAt: now, } - delete patch.error - return this.with(patch) + delete (next as { running?: BatchRunning }).running + delete (next as { error?: string }).error + return new BatchPlan(next) } toData(): BatchPlanData { diff --git a/packages/core/src/domain/events/WorkspaceEvent.ts b/packages/core/src/domain/events/WorkspaceEvent.ts index 1485cf1..47801a3 100644 --- a/packages/core/src/domain/events/WorkspaceEvent.ts +++ b/packages/core/src/domain/events/WorkspaceEvent.ts @@ -41,6 +41,9 @@ export type WorkspaceEvent = | BatchCompletedEvent | BatchStoppedEvent | BatchFailedEvent + | BatchCheckpointStartedEvent + | BatchCheckpointCompletedEvent + | BatchCheckpointFailedEvent export interface RunStartedEvent { readonly type: 'run.started' @@ -190,4 +193,29 @@ export interface BatchFailedEvent { readonly at: string } +export interface BatchCheckpointStartedEvent { + readonly type: 'batch.checkpoint.started' + readonly workspaceId: WorkspaceId + readonly planId: BatchPlanId + readonly skillRunId: SkillRunId + readonly at: string +} + +export interface BatchCheckpointCompletedEvent { + readonly type: 'batch.checkpoint.completed' + readonly workspaceId: WorkspaceId + readonly planId: BatchPlanId + readonly skillRunId: SkillRunId + readonly at: string +} + +export interface BatchCheckpointFailedEvent { + readonly type: 'batch.checkpoint.failed' + readonly workspaceId: WorkspaceId + readonly planId: BatchPlanId + readonly skillRunId: SkillRunId + readonly error: string + readonly at: string +} + export type WorkspaceEventType = WorkspaceEvent['type'] diff --git a/packages/core/src/domain/plugin/Ontology.ts b/packages/core/src/domain/plugin/Ontology.ts index 38358ff..105aa88 100644 --- a/packages/core/src/domain/plugin/Ontology.ts +++ b/packages/core/src/domain/plugin/Ontology.ts @@ -1,4 +1,4 @@ -import type { EdgeTypeId, ModelSnapshot, NodeStatus, NodeTypeId, OntologyId, ValidationIssue } from '@braidhq/schema' +import type { EdgeTypeId, ModelSnapshot, NodeStatus, NodeTypeId, OntologyId, PlanUnit, SkillId, ValidationIssue } from '@braidhq/schema' import type { Plugin } from './Plugin.js' export interface NodeTypeDescriptor { @@ -71,6 +71,44 @@ export interface OntologyValidator { validate: (snapshot: ModelSnapshot) => Promise } +/** Skill dispatched against one intent unit, plus its UI label and arg builder. */ +export interface OntologyPerUnitBinding { + readonly skillId: SkillId + /** UI badge text per unit row. Falls back to `skillId` when omitted. */ + readonly label?: string + /** Defaults to `unit.scopeHint ?? unit.name`. */ + readonly argsFor?: (unit: PlanUnit) => string +} + +/** Cross-unit hook fired between unit runs. Omit for a pure per-unit batch. */ +export interface OntologyCheckpointBinding { + readonly skillId: SkillId + /** UI badge text per checkpoint row. Falls back to `skillId` when omitted. */ + readonly label?: string + /** Fire after every N successful per-unit runs. */ + readonly chunkSize: number + /** Run an extra checkpoint at the end of the loop even when units divide evenly. */ + readonly runAtEnd: boolean + /** Env vars the checkpoint skill reads (e.g. DDD's `BRAID_CHANGED_UNITS`). */ + readonly extraEnv?: (units: readonly PlanUnit[]) => Record +} + +/** Discovery skill that produces a unit list when the workspace has no intent source. */ +export interface OntologyDeriveUnitsBinding { + readonly skillId: SkillId +} + +/** + * Ontology's contract with framework-level batch / reactor processes. + * Framework owns "what to dispatch and when"; ontology owns skill IDs + * and env contracts. Optional fields let an ontology opt out of phases. + */ +export interface OntologyBatchBinding { + readonly perUnit: OntologyPerUnitBinding + readonly checkpoint?: OntologyCheckpointBinding + readonly deriveUnits?: OntologyDeriveUnitsBinding +} + export interface OntologyPlugin extends Plugin { readonly type: 'ontology' readonly ontologyId: OntologyId @@ -81,4 +119,10 @@ export interface OntologyPlugin extends Plugin { * configured to use this ontology. Populated by `defineOntology()`. */ readonly validators: readonly OntologyValidator[] + /** + * Wiring for batch / reactor orchestration. Required for any + * ontology that wants to participate in batches; framework refuses + * to start a batch if this is absent on the workspace's ontology. + */ + readonly batch?: OntologyBatchBinding } diff --git a/packages/core/src/domain/skill/SkillRunner.ts b/packages/core/src/domain/skill/SkillRunner.ts index 602ff0b..a6a3a54 100644 --- a/packages/core/src/domain/skill/SkillRunner.ts +++ b/packages/core/src/domain/skill/SkillRunner.ts @@ -8,6 +8,21 @@ export interface SkillRunOptions { * `--resume ` so the model keeps its context. */ readonly resumeSessionId?: string + /** + * Extra environment variables merged into the spawned skill's env. + * Used by orchestration code (e.g. `BatchService` passing + * `BRAID_CHANGED_UNITS` to `braid-model`) when the single positional + * `args` string is already spoken for. + */ + readonly extraEnv?: Readonly> + /** + * Bearer token the spawned subprocess should use to call back into + * Braid's REST API via the `braid-core` MCP gateway. Route handlers + * forward the caller's session token here so the agent inherits the + * user's permissions. Absent in `BRAID_LOCAL_TRUST=true` mode where + * the server lets anonymous traffic through. + */ + readonly callerToken?: string } export type SkillEventListener = (event: SkillEvent) => void diff --git a/packages/core/src/domain/source/SourceUnitDigest.ts b/packages/core/src/domain/source/SourceUnitDigest.ts new file mode 100644 index 0000000..10e7a8a --- /dev/null +++ b/packages/core/src/domain/source/SourceUnitDigest.ts @@ -0,0 +1,16 @@ +import type { SourceId, SourceUnitSha } from '@braidhq/schema' +import type { Workspace } from '../workspace/Workspace.js' + +/** + * Computes a stable content fingerprint for one source unit on disk. + * Separate from the repository because the hashing strategy is + * loader-specific: a filesystem source walks the directory; an MCP + * source might delegate to the server's etag; etc. + */ +export interface SourceUnitDigest { + computeSha: ( + workspace: Workspace, + sourceId: SourceId, + path: string, + ) => Promise +} diff --git a/packages/core/src/domain/source/SourceUnitStateRepository.ts b/packages/core/src/domain/source/SourceUnitStateRepository.ts new file mode 100644 index 0000000..d1061cc --- /dev/null +++ b/packages/core/src/domain/source/SourceUnitStateRepository.ts @@ -0,0 +1,27 @@ +import type { SourceId, SourceUnitState, WorkspaceId } from '@braidhq/schema' + +/** + * Repository port for `SourceUnitState`. CRUD shape matching the + * existing `ProposalRepository` / `ClarifyTicketRepository` pattern so + * a SQL-backed implementation can swap in without touching the domain. + * + * Identity is the composite `(workspaceId, sourceId, path)`. No + * delete: orphaned states stay in the store as audit trace; pruning is + * deferred to a future GC step. + */ +export interface SourceUnitStateRepository { + find: ( + workspaceId: WorkspaceId, + sourceId: SourceId, + path: string, + ) => Promise + + save: (state: SourceUnitState) => Promise + + listByWorkspace: (workspaceId: WorkspaceId) => Promise + + listBySource: ( + workspaceId: WorkspaceId, + sourceId: SourceId, + ) => Promise +} diff --git a/packages/core/src/domain/source/computeSourceUnitDiff.ts b/packages/core/src/domain/source/computeSourceUnitDiff.ts new file mode 100644 index 0000000..5a2dab6 --- /dev/null +++ b/packages/core/src/domain/source/computeSourceUnitDiff.ts @@ -0,0 +1,49 @@ +import type { SourceUnit, SourceUnitDiff, SourceUnitState } from '@braidhq/schema' + +function key(sourceId: string, path: string): string { + return `${sourceId}::${path}` +} + +/** + * Pure partition: given the current set of on-disk units and the + * existing recorded states, classify each unit as new / changed / + * unchanged, and call out any state entries whose unit is no longer on + * disk (orphaned). + * + * Both inputs are scoped to a single workspace by the caller; this + * function does not filter on workspaceId. + */ +export function computeSourceUnitDiff( + states: readonly SourceUnitState[], + units: readonly SourceUnit[], +): SourceUnitDiff { + const byKey = new Map() + for (const state of states) + byKey.set(key(state.sourceId, state.path), state) + + const seen = new Set() + const result: SourceUnitDiff = { + new: [], + changed: [], + unchanged: [], + orphaned: [], + } + for (const unit of units) { + const k = key(unit.sourceId, unit.path) + seen.add(k) + const state = byKey.get(k) + if (!state) { + result.new.push(unit) + continue + } + if (state.lastObservedSha === unit.sha) + result.unchanged.push(unit) + else + result.changed.push(unit) + } + for (const [k, state] of byKey.entries()) { + if (!seen.has(k)) + result.orphaned.push(state) + } + return result +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index ab893c6..41a8b82 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -1,9 +1,11 @@ export * from './application/BatchService.js' +export * from './application/enrichCommitAuthor.js' export * from './application/HistoryService.js' export * from './application/HITLService.js' export * from './application/ModelService.js' export * from './application/PerWorkspaceLock.js' export * from './application/SourceLoaderRunner.js' +export * from './application/SourceUnitStateService.js' export * from './application/ValidationService.js' export * from './application/WorkspaceBootstrap.js' export * from './application/WorkspaceEventBus.js' @@ -37,6 +39,9 @@ export * from './domain/skill/RunRepository.js' export * from './domain/skill/SkillManifest.js' export * from './domain/skill/SkillRegistry.js' export * from './domain/skill/SkillRunner.js' +export * from './domain/source/computeSourceUnitDiff.js' +export * from './domain/source/SourceUnitDigest.js' +export * from './domain/source/SourceUnitStateRepository.js' export * from './domain/users/UserDirectory.js' export * from './domain/workspace/Workspace.js' export * from './domain/workspace/WorkspaceRepository.js' @@ -45,6 +50,7 @@ export * from './infrastructure/in-memory/InMemoryDecisionRepository.js' export * from './infrastructure/in-memory/InMemoryKeyedStore.js' export * from './infrastructure/in-memory/InMemoryModelRepository.js' export * from './infrastructure/in-memory/InMemoryProposalRepository.js' +export * from './infrastructure/in-memory/InMemorySourceUnitStateRepository.js' export * from './infrastructure/in-memory/InMemoryWorkspaceEventBus.js' export * from './infrastructure/in-memory/InMemoryWorkspaceRepository.js' export * from './infrastructure/in-memory/NoopRunRepository.js' diff --git a/packages/core/src/infrastructure/in-memory/InMemorySourceUnitStateRepository.ts b/packages/core/src/infrastructure/in-memory/InMemorySourceUnitStateRepository.ts new file mode 100644 index 0000000..ba1f2cf --- /dev/null +++ b/packages/core/src/infrastructure/in-memory/InMemorySourceUnitStateRepository.ts @@ -0,0 +1,31 @@ +import type { SourceId, SourceUnitState, WorkspaceId } from '@braidhq/schema' +import type { SourceUnitStateRepository } from '../../domain/source/SourceUnitStateRepository.js' + +function key(workspaceId: WorkspaceId, sourceId: SourceId, path: string): string { + return `${workspaceId}::${sourceId}::${path}` +} + +/** + * Default `SourceUnitStateRepository` for tests and the in-memory + * `composeApp()` wiring. Composite-key lookup by + * `(workspaceId, sourceId, path)`. + */ +export class InMemorySourceUnitStateRepository implements SourceUnitStateRepository { + private readonly store = new Map() + + async find(workspaceId: WorkspaceId, sourceId: SourceId, path: string): Promise { + return this.store.get(key(workspaceId, sourceId, path)) ?? null + } + + async save(state: SourceUnitState): Promise { + this.store.set(key(state.workspaceId, state.sourceId, state.path), state) + } + + async listByWorkspace(workspaceId: WorkspaceId): Promise { + return [...this.store.values()].filter(s => s.workspaceId === workspaceId) + } + + async listBySource(workspaceId: WorkspaceId, sourceId: SourceId): Promise { + return [...this.store.values()].filter(s => s.workspaceId === workspaceId && s.sourceId === sourceId) + } +} diff --git a/packages/core/test/application/BatchService.test.ts b/packages/core/test/application/BatchService.test.ts index 7b05f3c..e695a35 100644 --- a/packages/core/test/application/BatchService.test.ts +++ b/packages/core/test/application/BatchService.test.ts @@ -8,21 +8,26 @@ import type { SkillRunId, SourceDescriptor, SourceId, + SourceUnitSha, TagMeta, UserId, WorkspaceId, } from '@braidhq/schema' -import type { BatchPlan, BatchPlanRepository, HistoryService, HITLService, SkillEventListener, SkillRunner, SkillRunSubscription, Workspace } from '../../src/index.js' -import { FixedClock, makeWorkspace, mintTestId, resetTestIds, T0 } from '@braidhq/test-utils' +import type { BatchPlan, BatchPlanRepository, HistoryService, HITLService, SkillEventListener, SkillRunner, SkillRunOptions, SkillRunSubscription, SourceUnitDigest, Workspace } from '../../src/index.js' +import { SkillId as SkillIdSchema } from '@braidhq/schema' +import { FixedClock, makeOntology, makeWorkspace, mintTestId, resetTestIds, T0 } from '@braidhq/test-utils' import { beforeEach, describe, expect, it, vi } from 'vitest' import { BatchService, ConflictError, InMemoryClarifyTicketRepository, InMemoryProposalRepository, + InMemorySourceUnitStateRepository, InMemoryWorkspaceRepository, PerWorkspaceLock, + PluginRegistry, Proposal, + SourceUnitStateService, ValidationError, WorkspaceService, } from '../../src/index.js' @@ -30,7 +35,7 @@ import { // Minimal fakes for the ports we need beyond the existing in-memory ones. class FakeSkillRunner implements SkillRunner { - readonly startCalls: Array<{ skillId: SkillId, args: string }> = [] + readonly startCalls: Array<{ skillId: SkillId, args: string, options?: SkillRunOptions }> = [] // Per-call exit code (default 0). Override by pushing to `exitCodes`. exitCodes: number[] = [] // Hook fired AFTER start resolves, BEFORE completed event — lets tests @@ -38,9 +43,9 @@ class FakeSkillRunner implements SkillRunner { // attributes to this unit via set-difference. onStart?: (skillId: SkillId, runId: SkillRunId) => Promise - async start(_workspace: Workspace, skillId: SkillId, args: string): Promise { + async start(_workspace: Workspace, skillId: SkillId, args: string, options?: SkillRunOptions): Promise { const runId = `r-${this.startCalls.length}` as SkillRunId - this.startCalls.push({ skillId, args }) + this.startCalls.push({ skillId, args, ...(options ? { options } : {}) }) // Defer via setTimeout so the orchestrator's subsequent `subscribe` // registers before `completed` fires (queueMicrotask is too eager). setTimeout(async () => { @@ -92,6 +97,7 @@ function fakeHistoryService(): HistoryService & { tagCalls: TagMeta[] } { tagCalls.push(tag) return tag }), + commitWorkspaceChange: vi.fn(async (): Promise => '1'.repeat(40) as CommitSha), tagCalls, } as unknown as HistoryService & { tagCalls: TagMeta[] } } @@ -127,9 +133,18 @@ function codeSource(id: string): SourceDescriptor { } } +class FakeSourceUnitDigest implements SourceUnitDigest { + readonly calls: Array<{ sourceId: string, path: string }> = [] + async computeSha(_workspace: Workspace, sourceId: SourceId, path: string): Promise { + this.calls.push({ sourceId, path }) + return `${'0'.repeat(63)}${(this.calls.length % 10).toString()}` as SourceUnitSha + } +} + async function setup(options: { sources?: readonly SourceDescriptor[] autoApply?: boolean + withObservations?: boolean } = {}) { resetTestIds() const workspaceRepo = new InMemoryWorkspaceRepository() @@ -140,6 +155,26 @@ async function setup(options: { await workspaceRepo.save(workspace) const workspaceService = new WorkspaceService({ workspaceRepository: workspaceRepo }) + // Register a DDD-like ontology with the batch binding the production + // ontology declares. Tests assert on the resulting skill ids. + const pluginRegistry = new PluginRegistry() + await pluginRegistry.register(makeOntology({ + ontologyId: 'ddd', + batch: { + perUnit: { skillId: SkillIdSchema.parse('braid-extract') }, + checkpoint: { + skillId: SkillIdSchema.parse('braid-model'), + chunkSize: 5, + runAtEnd: true, + extraEnv: (units) => { + const hint = units.filter(u => u.sourceId && u.scopeHint).map(u => `${u.sourceId}::${u.scopeHint}`).join('\n') + return hint ? { BRAID_CHANGED_UNITS: hint } : {} + }, + }, + deriveUnits: { skillId: SkillIdSchema.parse('braid-scan') }, + }, + })) + const proposalRepository = new InMemoryProposalRepository() const clarifyRepository = new InMemoryClarifyTicketRepository() const planRepository = new InMemoryBatchPlanRepository() @@ -148,6 +183,17 @@ async function setup(options: { const hitl = fakeHitlService() const clock = new FixedClock() + const sourceUnitStateRepository = new InMemorySourceUnitStateRepository() + const sourceUnitDigest = new FakeSourceUnitDigest() + const sourceUnitStateService = options.withObservations + ? new SourceUnitStateService({ + repository: sourceUnitStateRepository, + digest: sourceUnitDigest, + workspaceService, + clock, + }) + : undefined + const service = new BatchService({ workspaceService, skillRunner, @@ -167,9 +213,22 @@ async function setup(options: { }, workspaceLock: new PerWorkspaceLock(), clock, + pluginRegistry, + ...(sourceUnitStateService ? { sourceUnitStateService } : {}), }) - return { service, workspace, proposalRepository, planRepository, skillRunner, history, hitl, clock } + return { + service, + workspace, + proposalRepository, + planRepository, + skillRunner, + history, + hitl, + clock, + sourceUnitStateRepository, + sourceUnitDigest, + } } async function flushBatch(planRepository: InMemoryBatchPlanRepository): Promise { @@ -222,7 +281,11 @@ describe('BatchService', () => { expect(final.mode).toBe('intent') expect(final.units).toHaveLength(2) expect(final.units.every(u => u.status === 'completed')).toBe(true) - expect(skillRunner.startCalls.map(c => c.skillId)).toEqual(['braid-extract', 'braid-extract']) + expect(skillRunner.startCalls.map(c => c.skillId)).toEqual([ + 'braid-extract', + 'braid-extract', + 'braid-model', + ]) expect(final.units[0]!.proposalIds).toEqual(['p-1']) expect(final.units[1]!.proposalIds).toEqual(['p-2']) }) @@ -238,7 +301,9 @@ describe('BatchService', () => { await service.start(workspace.id, { autoApply: true }) await flushBatch(planRepository) - expect(hitl.applyCalls).toEqual(['p-1', 'p-2']) + // 2 extracts + 1 final checkpoint = 3 skill runs, each produces a + // fresh proposal; with autoApply on, all three get applied. + expect(hitl.applyCalls).toEqual(['p-1', 'p-2', 'p-3']) }) it('marks a unit failed when extract exits non-zero, continues to next', async () => { @@ -269,14 +334,14 @@ describe('BatchService', () => { await expect(service.start(workspace.id, { autoApply: false })).rejects.toThrow(ConflictError) }) - it('chooses scan mode when no intent sources exist', async () => { + it('chooses derive mode when no intent sources exist and runs the ontology deriveUnits skill', async () => { const { service, workspace, planRepository, skillRunner } = await setup({ sources: [codeSource('codebase')], }) await service.start(workspace.id, { autoApply: false }) - // The orchestrator runs scan in the background; assert the kick-off and mode without driving the loop to completion. + // The orchestrator runs the derive skill in the background; assert the kick-off and mode without driving the loop to completion. expect(skillRunner.startCalls[0]?.skillId).toBe('braid-scan') - expect((await planRepository.load())?.mode).toBe('scan') + expect((await planRepository.load())?.mode).toBe('derive') }) it('archive moves a completed plan to archived status', async () => { @@ -294,4 +359,115 @@ describe('BatchService', () => { const { service, workspace } = await setup() await expect(service.archive(workspace.id)).rejects.toThrow(ValidationError) }) + + it('runs braid-model once after the extract loop and passes BRAID_CHANGED_UNITS', async () => { + const { service, workspace, planRepository, skillRunner } = await setup() + await service.start(workspace.id, { autoApply: false }) + const final = await flushBatch(planRepository) + + expect(final.status).toBe('completed') + const modelCalls = skillRunner.startCalls.filter(c => c.skillId === 'braid-model') + expect(modelCalls).toHaveLength(1) + expect(modelCalls[0]!.args).toBe('') + const env = modelCalls[0]!.options?.extraEnv + expect(env).toBeDefined() + const lines = (env!.BRAID_CHANGED_UNITS ?? '').split('\n') + expect(lines).toHaveLength(2) + expect(lines.every(l => /^[^:]+::[^:]+\/$/.test(l))).toBe(true) + }) + + it('marks the plan failed when the checkpoint skill exits non-zero', async () => { + const { service, workspace, planRepository, skillRunner } = await setup() + skillRunner.exitCodes = [0, 0, 1] + await service.start(workspace.id, { autoApply: false }) + const final = await flushBatch(planRepository) + + expect(final.status).toBe('failed') + expect(final.error).toMatch(/checkpoint "braid-model" failed/) + expect(final.units.every(u => u.status === 'completed')).toBe(true) + }) + + it('records a SourceUnitState observation per completed unit when service is wired', async () => { + const { service, workspace, planRepository, sourceUnitStateRepository, sourceUnitDigest } = await setup({ withObservations: true }) + await service.start(workspace.id, { autoApply: false }) + await flushBatch(planRepository) + + const states = await sourceUnitStateRepository.listByWorkspace(workspace.id) + expect(states).toHaveLength(2) + const paths = states.map(s => s.path).sort() + expect(paths).toEqual(['design/', 'prd/']) + for (const state of states) { + expect(state.lastObservedSha).toMatch(/^[a-f0-9]{64}$/) + expect(state.lastObservedAt).toBe(T0) + expect(state.lastObservedByRunId).toBeDefined() + } + // Digest was consulted for each extracted unit. + expect(sourceUnitDigest.calls).toHaveLength(2) + }) + + it('does not record observations when the service is absent (in-memory composeApp default)', async () => { + const { service, workspace, planRepository } = await setup() + await service.start(workspace.id, { autoApply: false }) + const final = await flushBatch(planRepository) + expect(final.status).toBe('completed') + // Nothing to assert on the state store — the service was not wired. + }) + + it('chunks braid-model every 5 successful extracts and runs a final partial chunk', async () => { + // 7 intent sources => 7 units => one full chunk (5) + one partial (2). + const sources = Array.from({ length: 7 }, (_, i) => intentSource(`src-${i}`)) + const { service, workspace, planRepository, skillRunner } = await setup({ sources }) + + await service.start(workspace.id, { autoApply: false }) + const final = await flushBatch(planRepository) + + expect(final.status).toBe('completed') + expect(final.units).toHaveLength(7) + const skillIds = skillRunner.startCalls.map(c => c.skillId) + expect(skillIds).toEqual([ + 'braid-extract', + 'braid-extract', + 'braid-extract', + 'braid-extract', + 'braid-extract', + 'braid-model', + 'braid-extract', + 'braid-extract', + 'braid-model', + ]) + expect(final.checkpointPhases).toHaveLength(2) + expect(final.checkpointPhases[0]!.unitIds).toHaveLength(5) + expect(final.checkpointPhases[1]!.unitIds).toHaveLength(2) + expect(final.checkpointPhases.every(p => p.status === 'completed')).toBe(true) + }) + + it('always runs a final model pass even when chunks divide evenly', async () => { + // Exactly 5 units = one full chunk. We still want a final model. + const sources = Array.from({ length: 5 }, (_, i) => intentSource(`src-${i}`)) + const { service, workspace, planRepository, skillRunner } = await setup({ sources }) + + await service.start(workspace.id, { autoApply: false }) + const final = await flushBatch(planRepository) + + expect(final.status).toBe('completed') + const modelCount = skillRunner.startCalls.filter(c => c.skillId === 'braid-model').length + expect(modelCount).toBe(2) + expect(final.checkpointPhases).toHaveLength(2) + expect(final.checkpointPhases[0]!.unitIds).toHaveLength(5) + expect(final.checkpointPhases[1]!.unitIds).toHaveLength(0) + }) + + it('records checkpointPhases entries with skillRunId, startedAt, completedAt', async () => { + const { service, workspace, planRepository } = await setup() + await service.start(workspace.id, { autoApply: false }) + const final = await flushBatch(planRepository) + + expect(final.checkpointPhases).toHaveLength(1) + const phase = final.checkpointPhases[0]! + expect(phase.status).toBe('completed') + expect(phase.skillRunId).toBeDefined() + expect(phase.startedAt).toBeDefined() + expect(phase.completedAt).toBeDefined() + expect(phase.unitIds).toHaveLength(2) + }) }) diff --git a/packages/core/test/domain/batch/BatchPlan.test.ts b/packages/core/test/domain/batch/BatchPlan.test.ts index d268795..1dfae2a 100644 --- a/packages/core/test/domain/batch/BatchPlan.test.ts +++ b/packages/core/test/domain/batch/BatchPlan.test.ts @@ -42,6 +42,7 @@ function makePlan(overrides: Partial = {}): BatchPlan { status: 'idle', autoApply: false, units: [makeUnit(unitA, 'prd'), makeUnit(unitB, 'design')], + checkpointPhases: [], ...overrides, }) } @@ -55,9 +56,9 @@ describe('BatchPlan', () => { expect(plan.updatedAt).toBe(T1) }) - it('scan-mode goes idle → scanning', () => { - const plan = makePlan({ mode: 'scan', units: [] }).beginRun(T1, 'tag') - expect(plan.status).toBe('scanning') + it('derive-mode goes idle → deriving', () => { + const plan = makePlan({ mode: 'derive', units: [] }).beginRun(T1, 'tag') + expect(plan.status).toBe('deriving') }) it('refuses to start a plan that is not idle', () => { @@ -67,14 +68,14 @@ describe('BatchPlan', () => { }) describe('promoteToRunning', () => { - it('replaces units and moves scanning → running', () => { - const plan = makePlan({ mode: 'scan', status: 'scanning', units: [] }) + it('replaces units and moves deriving → running', () => { + const plan = makePlan({ mode: 'derive', status: 'deriving', units: [] }) const promoted = plan.promoteToRunning(T1, [makeUnit(unitA, 'orders')]) expect(promoted.status).toBe('running') expect(promoted.units.map(u => u.id)).toEqual([unitA]) }) - it('refuses unless current status is scanning', () => { + it('refuses unless current status is deriving', () => { const plan = makePlan({ status: 'running' }) expect(() => plan.promoteToRunning(T1, [])).toThrow(ConflictError) }) @@ -132,7 +133,7 @@ describe('BatchPlan', () => { it('rejects archive on non-terminal status', () => { expect(() => makePlan({ status: 'running' }).archive(T2)).toThrow(/Cannot archive/) - expect(() => makePlan({ status: 'scanning' }).archive(T2)).toThrow(/Cannot archive/) + expect(() => makePlan({ status: 'deriving' }).archive(T2)).toThrow(/Cannot archive/) expect(() => makePlan({ status: 'idle' }).archive(T2)).toThrow(/Cannot archive/) }) diff --git a/packages/core/test/domain/computeSourceUnitDiff.test.ts b/packages/core/test/domain/computeSourceUnitDiff.test.ts new file mode 100644 index 0000000..01be5a7 --- /dev/null +++ b/packages/core/test/domain/computeSourceUnitDiff.test.ts @@ -0,0 +1,54 @@ +import type { SkillRunId, SourceId, SourceUnit, SourceUnitSha, SourceUnitState, Timestamp, WorkspaceId } from '@braidhq/schema' +import { describe, expect, it } from 'vitest' +import { computeSourceUnitDiff } from '../../src/index.js' + +function sha(byte: string): SourceUnitSha { + return byte.repeat(64) as SourceUnitSha +} + +function state(path: string, byte: string): SourceUnitState { + return { + workspaceId: 'ws-1' as WorkspaceId, + sourceId: 'src-1' as SourceId, + path, + lastObservedSha: sha(byte), + lastObservedAt: '2026-06-08T00:00:00.000Z' as Timestamp, + lastObservedByRunId: 'run-1' as SkillRunId, + } +} + +function unit(path: string, byte: string): SourceUnit { + return { sourceId: 'src-1' as SourceId, path, sha: sha(byte) } +} + +describe('computeSourceUnitDiff', () => { + it('partitions units into new / changed / unchanged / orphaned', () => { + const result = computeSourceUnitDiff( + [ + state('done.md', '1'), + state('changed.md', '2'), + state('orphaned.md', '3'), + ], + [ + unit('done.md', '1'), + unit('changed.md', 'a'), + unit('new.md', 'b'), + ], + ) + + expect(result.unchanged.map(u => u.path)).toEqual(['done.md']) + expect(result.changed.map(u => u.path)).toEqual(['changed.md']) + expect(result.new.map(u => u.path)).toEqual(['new.md']) + expect(result.orphaned.map(s => s.path)).toEqual(['orphaned.md']) + }) + + it('empty inputs return empty partitions', () => { + const result = computeSourceUnitDiff([], []) + expect(result).toEqual({ new: [], changed: [], unchanged: [], orphaned: [] }) + }) + + it('treats units with no prior state as new', () => { + const result = computeSourceUnitDiff([], [unit('a.md', '1'), unit('b.md', '2')]) + expect(result.new.map(u => u.path)).toEqual(['a.md', 'b.md']) + }) +}) diff --git a/packages/ontology-ddd/skills/braid-model/SKILL.md b/packages/ontology-ddd/skills/braid-model/SKILL.md index 1260786..caae6f7 100644 --- a/packages/ontology-ddd/skills/braid-model/SKILL.md +++ b/packages/ontology-ddd/skills/braid-model/SKILL.md @@ -70,6 +70,7 @@ This skill is shipped by the DDD ontology plugin (`@braidhq/ontology-ddd`). Its 3. Fetch the active ontology via `braid-core` so every type id you reference is canonical. Every `node.type` / `edge.type` you emit MUST equal one of the ids the ontology declares. Case-sensitive. 4. Fetch the model snapshot via `braid-core`, then run two node-search calls filtering by `status: 'draft'` and `status: 'unclear'` to enumerate work-in-progress nodes for validation. 5. Parse `$ARGUMENTS` (scope-hint / `validate` / empty) and pick the mode. +6. Check `$BRAID_CHANGED_UNITS` (optional, newline-separated `::` entries set by BatchService or Reactor). When present, prioritise the build pass around the listed units (bridges, containment, drift involving any of their nodes); when absent, walk the whole graph as before. The validate pass always covers the full graph regardless of this hint. ## Procedure diff --git a/packages/ontology-ddd/src/DDDOntology.ts b/packages/ontology-ddd/src/DDDOntology.ts index 90a1624..0670333 100644 --- a/packages/ontology-ddd/src/DDDOntology.ts +++ b/packages/ontology-ddd/src/DDDOntology.ts @@ -1,4 +1,4 @@ -import type { EdgeTypeId, NodeTypeId, SkillId } from '@braidhq/schema' +import { EdgeTypeId, NodeTypeId, SkillId } from '@braidhq/schema' import { defineOntology } from '@braidhq/sdk' /** @@ -18,15 +18,15 @@ export const dddOntology = defineOntology({ // here rather than in @braidhq/core. skills: [ { - id: 'braid-extract' as SkillId, + id: SkillId.parse('braid-extract'), directory: new URL('../skills/braid-extract', import.meta.url), }, { - id: 'braid-clarify' as SkillId, + id: SkillId.parse('braid-clarify'), directory: new URL('../skills/braid-clarify', import.meta.url), }, { - id: 'braid-model' as SkillId, + id: SkillId.parse('braid-model'), directory: new URL('../skills/braid-model', import.meta.url), }, ], @@ -42,7 +42,7 @@ export const dddOntology = defineOntology({ nodeTypes: [ { - id: 'boundedContext' as NodeTypeId, + id: NodeTypeId.parse('boundedContext'), label: 'Bounded Context', description: 'A subsystem with its own ubiquitous language; everything inside is one consistency boundary. Strategic DDD primitive (Evans Blue Book Part IV; Khononov 2021 ch. 3).', color: 'oklch(0.62 0.18 274)', @@ -50,50 +50,50 @@ export const dddOntology = defineOntology({ renderHint: { container: true, section: 'Use cases' }, }, { - id: 'aggregate' as NodeTypeId, + id: NodeTypeId.parse('aggregate'), label: 'Aggregate', description: 'Cluster of domain objects treated as a unit for data changes; has a single root entity that controls access. Tactical DDD primitive (Evans Blue Book Part II; Vernon IDDD ch. 10; Khononov 2021 ch. 6).', color: 'oklch(0.7 0.15 155)', defaultVisible: true, - renderHint: { expandedUnder: 'boundedContext' as NodeTypeId }, + renderHint: { expandedUnder: NodeTypeId.parse('boundedContext') }, }, { - id: 'command' as NodeTypeId, + id: NodeTypeId.parse('command'), label: 'Command', description: 'Imperative request that asks the system to change state; names use verbs (placeOrder, cancelOrder). CQRS primitive (Young 2010; Khononov 2021 ch. 11). The blue sticky in EventStorming.', color: 'oklch(0.65 0.18 250)', - renderHint: { expandedUnder: 'aggregate' as NodeTypeId }, + renderHint: { expandedUnder: NodeTypeId.parse('aggregate') }, }, { - id: 'query' as NodeTypeId, + id: NodeTypeId.parse('query'), label: 'Query', description: 'Read-only request that returns state without modifying it. CQRS primitive (Young 2010; Khononov 2021 ch. 11). Strict CQRS routes queries to a dedicated read-model node; in the absence of that node type, queries attach to the aggregate.', color: 'oklch(0.7 0.13 220)', - renderHint: { expandedUnder: 'aggregate' as NodeTypeId }, + renderHint: { expandedUnder: NodeTypeId.parse('aggregate') }, }, { - id: 'event' as NodeTypeId, + id: NodeTypeId.parse('event'), label: 'Domain Event', description: 'Past-tense fact about something that has already happened (OrderPlaced, ItemAdded). Tactical DDD primitive (Vernon IDDD ch. 8) and the orange sticky in EventStorming.', color: 'oklch(0.78 0.16 80)', - renderHint: { expandedUnder: 'command' as NodeTypeId }, + renderHint: { expandedUnder: NodeTypeId.parse('command') }, }, { - id: 'rule' as NodeTypeId, + id: NodeTypeId.parse('rule'), label: 'Business Rule', description: 'Invariant that must hold (MaxItemsRule, PositiveQuantityRule). Tactical DDD (Evans Specification pattern; Vernon IDDD invariants). Per-operation rules attach to a command or query; aggregate-wide invariants attach to the aggregate itself.', color: 'oklch(0.65 0.2 20)', - renderHint: { expandedUnder: 'command' as NodeTypeId }, + renderHint: { expandedUnder: NodeTypeId.parse('command') }, }, { - id: 'actor' as NodeTypeId, + id: NodeTypeId.parse('actor'), label: 'Actor', description: 'External role that triggers a command or query (Customer, Admin, BillingService). EventStorming primitive (Brandolini; the yellow stick-figure sticky) and Khononov 2021. Not in strict Evans / Vernon canon, where the issuer lives on the command\'s metadata.', color: 'oklch(0.72 0.13 310)', renderHint: { section: 'Actors' }, }, { - id: 'policy' as NodeTypeId, + id: NodeTypeId.parse('policy'), label: 'Policy', description: 'Automatic reaction: "when event X happens, do Y". EventStorming primitive (Brandolini; the purple sticky) and Khononov 2021. Materialises Vernon\'s Process Manager / Saga pattern when the reaction crosses aggregates or has its own naming.', color: 'oklch(0.62 0.2 310)', @@ -103,74 +103,74 @@ export const dddOntology = defineOntology({ edgeTypes: [ { - id: 'contains' as EdgeTypeId, + id: EdgeTypeId.parse('contains'), label: 'contains', description: 'A BoundedContext contains aggregates. Commands, queries, events, and rules belong to an aggregate and are reached via accepts, emits, or constrainedBy. Strategic DDD (Evans Blue Book Part IV).', - fromTypes: ['boundedContext' as NodeTypeId], - toTypes: ['aggregate' as NodeTypeId], + fromTypes: [NodeTypeId.parse('boundedContext')], + toTypes: [NodeTypeId.parse('aggregate')], cardinality: '1:N', color: 'oklch(0.62 0.18 274)', }, { - id: 'accepts' as EdgeTypeId, + id: EdgeTypeId.parse('accepts'), label: 'accepts', description: 'Aggregate is the entry point for operations against its state; commands modify the aggregate and queries read it. Tactical DDD with CQRS (Khononov 2021 ch. 11). Strict CQRS would route queries to a dedicated read-model node; this ontology routes both through the aggregate.', - fromTypes: ['aggregate' as NodeTypeId], - toTypes: ['command', 'query'] as NodeTypeId[], + fromTypes: [NodeTypeId.parse('aggregate')], + toTypes: [NodeTypeId.parse('command'), NodeTypeId.parse('query')], cardinality: '1:N', color: 'oklch(0.7 0.15 155)', }, { - id: 'emits' as EdgeTypeId, + id: EdgeTypeId.parse('emits'), label: 'emits', description: 'A command produces an event as the result of executing on its aggregate. Either source is valid in this ontology: command-source (CQRS / EventStorming visual reading: cmd → evt) and aggregate-source (Vernon IDDD structural reading: agg → evt). Khononov 2021 illustrates both; prefer the command-source form when extracting from PRD/spec language and the aggregate-source form when describing state ownership.', - fromTypes: ['command', 'aggregate'] as NodeTypeId[], - toTypes: ['event' as NodeTypeId], + fromTypes: [NodeTypeId.parse('command'), NodeTypeId.parse('aggregate')], + toTypes: [NodeTypeId.parse('event')], cardinality: '1:N', color: 'oklch(0.78 0.16 80)', }, { - id: 'triggers' as EdgeTypeId, + id: EdgeTypeId.parse('triggers'), label: 'triggers', description: 'Process-manager / saga flow: an event drives a downstream command or policy, often in a different aggregate. EventStorming flow notation (Brandolini); CQRS saga pattern (Khononov 2021 ch. 11).', - fromTypes: ['event' as NodeTypeId], - toTypes: ['command', 'policy'] as NodeTypeId[], + fromTypes: [NodeTypeId.parse('event')], + toTypes: [NodeTypeId.parse('command'), NodeTypeId.parse('policy')], cardinality: 'N:N', color: 'oklch(0.65 0.18 250)', }, { - id: 'enacts' as EdgeTypeId, + id: EdgeTypeId.parse('enacts'), label: 'enacts', description: 'A policy issues a command as its reaction. The "do Y" half of EventStorming\'s "when X then Y" / Vernon\'s Process Manager (Brandolini; Vernon IDDD ch. 13; Khononov 2021).', - fromTypes: ['policy' as NodeTypeId], - toTypes: ['command' as NodeTypeId], + fromTypes: [NodeTypeId.parse('policy')], + toTypes: [NodeTypeId.parse('command')], cardinality: 'N:N', color: 'oklch(0.62 0.2 310)', }, { - id: 'constrainedBy' as EdgeTypeId, + id: EdgeTypeId.parse('constrainedBy'), label: 'constrained by', description: 'Per-operation rule: a command or query is constrained by a specific rule. Aggregate-wide invariant: an aggregate is constrained by a rule that must hold across all of its operations. Tactical DDD (Evans Specification pattern; Vernon IDDD invariants).', - fromTypes: ['command', 'query', 'aggregate'] as NodeTypeId[], - toTypes: ['rule' as NodeTypeId], + fromTypes: [NodeTypeId.parse('command'), NodeTypeId.parse('query'), NodeTypeId.parse('aggregate')], + toTypes: [NodeTypeId.parse('rule')], cardinality: 'N:N', color: 'oklch(0.65 0.2 20)', }, { - id: 'dependsOn' as EdgeTypeId, + id: EdgeTypeId.parse('dependsOn'), label: 'depends on', description: 'Aggregates reference other aggregates by id only. Tactical DDD (Vernon IDDD "Reference Other Aggregates by Identity" rule; Khononov 2021 ch. 6). Cross-aggregate command or query coupling is expressed through triggers rather than direct references.', - fromTypes: ['aggregate' as NodeTypeId], - toTypes: ['aggregate' as NodeTypeId], + fromTypes: [NodeTypeId.parse('aggregate')], + toTypes: [NodeTypeId.parse('aggregate')], cardinality: 'N:N', color: 'oklch(0.7 0.13 220)', }, { - id: 'performedBy' as EdgeTypeId, + id: EdgeTypeId.parse('performedBy'), label: 'performed by', description: 'A command or query is triggered by an actor. EventStorming convention (Brandolini) and Khononov 2021. Not in strict Evans / Vernon canon, where the issuer lives on the command\'s metadata rather than as a graph edge.', - fromTypes: ['command', 'query'] as NodeTypeId[], - toTypes: ['actor' as NodeTypeId], + fromTypes: [NodeTypeId.parse('command'), NodeTypeId.parse('query')], + toTypes: [NodeTypeId.parse('actor')], cardinality: 'N:N', color: 'oklch(0.72 0.13 310)', }, @@ -179,67 +179,97 @@ export const dddOntology = defineOntology({ // All edges run BoundedContext to BoundedContext; each pattern has its own direction and cardinality. // These describe strategic relationships between teams and integrations, not derivable from individual feature slices. { - id: 'partnership' as EdgeTypeId, + id: EdgeTypeId.parse('partnership'), label: 'partnership', description: 'Symmetric: two BoundedContexts are committed to succeed or fail together; coordinated planning and joint releases. Strategic DDD Context Mapping (Evans Blue Book Part IV; Khononov 2021 ch. 4).', - fromTypes: ['boundedContext' as NodeTypeId], - toTypes: ['boundedContext' as NodeTypeId], + fromTypes: [NodeTypeId.parse('boundedContext')], + toTypes: [NodeTypeId.parse('boundedContext')], cardinality: 'N:N', color: 'oklch(0.6 0.15 240)', }, { - id: 'customerSupplier' as EdgeTypeId, + id: EdgeTypeId.parse('customerSupplier'), label: 'customer-supplier', description: 'Asymmetric (customer downstream, supplier upstream): the customer BoundedContext depends on the supplier and has political pull to ask for changes. Strategic DDD Context Mapping (Evans Blue Book Part IV; Khononov 2021 ch. 4).', - fromTypes: ['boundedContext' as NodeTypeId], - toTypes: ['boundedContext' as NodeTypeId], + fromTypes: [NodeTypeId.parse('boundedContext')], + toTypes: [NodeTypeId.parse('boundedContext')], cardinality: 'N:1', color: 'oklch(0.65 0.18 200)', }, { - id: 'conformist' as EdgeTypeId, + id: EdgeTypeId.parse('conformist'), label: 'conformist', description: 'Asymmetric (conformist downstream, upstream uncooperative): the downstream BoundedContext depends on an upstream it has no political pull over and adopts the upstream\'s model as-is. Strategic DDD Context Mapping (Evans Blue Book Part IV; Khononov 2021 ch. 4).', - fromTypes: ['boundedContext' as NodeTypeId], - toTypes: ['boundedContext' as NodeTypeId], + fromTypes: [NodeTypeId.parse('boundedContext')], + toTypes: [NodeTypeId.parse('boundedContext')], cardinality: 'N:1', color: 'oklch(0.55 0.13 50)', }, { - id: 'sharedKernel' as EdgeTypeId, + id: EdgeTypeId.parse('sharedKernel'), label: 'shared kernel', description: 'Symmetric: two BoundedContexts intentionally share a small piece of model (often a value object). Any change to the shared part requires coordination between both teams. Strategic DDD Context Mapping (Evans Blue Book Part IV; Khononov 2021 ch. 4).', - fromTypes: ['boundedContext' as NodeTypeId], - toTypes: ['boundedContext' as NodeTypeId], + fromTypes: [NodeTypeId.parse('boundedContext')], + toTypes: [NodeTypeId.parse('boundedContext')], cardinality: 'N:N', color: 'oklch(0.6 0.13 160)', }, { - id: 'anticorruptionLayer' as EdgeTypeId, + id: EdgeTypeId.parse('anticorruptionLayer'), label: 'anticorruption layer', description: 'Asymmetric (acl-owner downstream, upstream isolated from): the downstream BoundedContext isolates itself from the upstream by building a translation layer so its internal model is not corrupted by the upstream\'s shape. Strategic DDD Context Mapping (Evans Blue Book Part IV; Khononov 2021 ch. 4).', - fromTypes: ['boundedContext' as NodeTypeId], - toTypes: ['boundedContext' as NodeTypeId], + fromTypes: [NodeTypeId.parse('boundedContext')], + toTypes: [NodeTypeId.parse('boundedContext')], cardinality: 'N:N', color: 'oklch(0.55 0.18 30)', }, { - id: 'openHostService' as EdgeTypeId, + id: EdgeTypeId.parse('openHostService'), label: 'open host service', description: 'Asymmetric (host upstream, consumer downstream): the upstream BoundedContext offers a well-defined open protocol any downstream can consume without bespoke negotiation. Strategic DDD Context Mapping (Evans Blue Book Part IV; Khononov 2021 ch. 4).', - fromTypes: ['boundedContext' as NodeTypeId], - toTypes: ['boundedContext' as NodeTypeId], + fromTypes: [NodeTypeId.parse('boundedContext')], + toTypes: [NodeTypeId.parse('boundedContext')], cardinality: '1:N', color: 'oklch(0.65 0.15 130)', }, { - id: 'publishedLanguage' as EdgeTypeId, + id: EdgeTypeId.parse('publishedLanguage'), label: 'published language', description: 'Asymmetric (publisher upstream, consumer downstream): the upstream BoundedContext publishes a documented schema or format; downstreams consume it as-is. Often combined with openHostService. Strategic DDD Context Mapping (Evans Blue Book Part IV; Khononov 2021 ch. 4).', - fromTypes: ['boundedContext' as NodeTypeId], - toTypes: ['boundedContext' as NodeTypeId], + fromTypes: [NodeTypeId.parse('boundedContext')], + toTypes: [NodeTypeId.parse('boundedContext')], cardinality: '1:N', color: 'oklch(0.7 0.15 100)', }, ], + + /** + * Batch / reactor binding. DDD's per-unit skill is `braid-extract`; + * the checkpoint is `braid-model`, fired every 5 successful extracts + * and once more at the end of the loop for global validation. When + * the workspace has no intent source, `braid-scan` derives units + * from the codebase. + */ + batch: { + perUnit: { + skillId: SkillId.parse('braid-extract'), + label: 'Extract', + }, + checkpoint: { + skillId: SkillId.parse('braid-model'), + label: 'Model', + chunkSize: 5, + runAtEnd: true, + extraEnv: (units) => { + const hint = units + .filter(u => u.sourceId && u.scopeHint) + .map(u => `${u.sourceId}::${u.scopeHint}`) + .join('\n') + return hint ? { BRAID_CHANGED_UNITS: hint } : {} + }, + }, + deriveUnits: { + skillId: SkillId.parse('braid-scan'), + }, + }, }) diff --git a/packages/schema/src/batch.ts b/packages/schema/src/batch.ts index 9ee3e63..e5a84f0 100644 --- a/packages/schema/src/batch.ts +++ b/packages/schema/src/batch.ts @@ -1,5 +1,5 @@ import { z } from 'zod' -import { ClarifyTicketId, ProposalId, SkillRunId, SourceId, Timestamp, WorkspaceId } from './common.js' +import { ClarifyTicketId, ProposalId, SkillId, SkillRunId, SourceId, Timestamp, WorkspaceId } from './common.js' export const PlanUnitId = z.string().min(1).brand<'PlanUnitId'>() export type PlanUnitId = z.infer @@ -10,12 +10,13 @@ export type BatchPlanId = z.infer export const UnitStatus = z.enum(['pending', 'running', 'completed', 'failed', 'skipped']) export type UnitStatus = z.infer -export const BatchStatus = z.enum(['idle', 'scanning', 'running', 'completed', 'failed', 'stopped', 'archived']) +export const BatchStatus = z.enum(['idle', 'deriving', 'running', 'completed', 'failed', 'stopped', 'archived']) export type BatchStatus = z.infer -// `intent` walks each intent source directly. `scan` runs braid-scan first to -// derive units from a codebase when no intent docs exist. -export const BatchInputMode = z.enum(['intent', 'scan']) +// `intent` walks each intent source directly. `derive` first runs the +// ontology's `deriveUnits` skill to populate the unit list from a +// code-only workspace. +export const BatchInputMode = z.enum(['intent', 'derive']) export type BatchInputMode = z.infer export const PlanUnit = z.object({ @@ -23,7 +24,7 @@ export const PlanUnit = z.object({ name: z.string().min(1), description: z.string(), // Source this unit was derived from (mode='intent' = the intent source it belongs to). - // Omitted in mode='scan' because units come from the scanner's business-area decomposition. + // Omitted in mode='derive' because units come from the ontology's discovery skill. sourceId: SourceId.optional(), // What to pass to braid-extract as scope-hint. For intent mode this is the // doc / folder name within the source (e.g. "TSK00010 文字欄位.../"); empty @@ -46,6 +47,24 @@ export const BatchRunning = z.object({ }) export type BatchRunning = z.infer +export const BatchCheckpointPhaseStatus = z.enum(['running', 'completed', 'failed']) +export type BatchCheckpointPhaseStatus = z.infer + +/** + * One execution of the ontology-provided checkpoint skill. Append-only + * inside `BatchPlan.checkpointPhases`; `unitIds` records which units + * this run consumed so chunk accounting knows what's still pending. + */ +export const BatchCheckpointPhase = z.object({ + status: BatchCheckpointPhaseStatus, + unitIds: z.array(PlanUnitId), + startedAt: Timestamp.optional(), + completedAt: Timestamp.optional(), + skillRunId: SkillRunId.optional(), + error: z.string().optional(), +}) +export type BatchCheckpointPhase = z.infer + export const BatchPlan = z.object({ id: BatchPlanId, workspaceId: WorkspaceId, @@ -59,5 +78,24 @@ export const BatchPlan = z.object({ units: z.array(PlanUnit), running: BatchRunning.optional(), error: z.string().optional(), + /** + * Append-only history of checkpoint skill runs. Failed phases are + * dropped by `resumeRun` so a re-run starts chunk accounting fresh. + */ + checkpointPhases: z.array(BatchCheckpointPhase).default([]), + /** + * Frozen snapshot of the ontology's batch binding taken at start(). + * UI reads it to label steps and pre-split anticipated chunks; + * resume reads it so chunk accounting survives ontology config + * changes mid-plan. + */ + batchPolicy: z.object({ + perUnitSkillId: SkillId, + perUnitLabel: z.string().optional(), + checkpointSkillId: SkillId.optional(), + checkpointLabel: z.string().optional(), + checkpointChunkSize: z.number().int().positive().optional(), + checkpointRunAtEnd: z.boolean().optional(), + }).optional(), }) export type BatchPlan = z.infer diff --git a/packages/schema/src/history.ts b/packages/schema/src/history.ts index 60ca2d2..ad140ae 100644 --- a/packages/schema/src/history.ts +++ b/packages/schema/src/history.ts @@ -15,6 +15,7 @@ export const CommitKind = z.enum([ 'restore', 'snapshot', 'initial', + 'batch-archive', ]) export type CommitKind = z.infer diff --git a/packages/schema/src/index.ts b/packages/schema/src/index.ts index cbbe1ec..9ea8854 100644 --- a/packages/schema/src/index.ts +++ b/packages/schema/src/index.ts @@ -14,6 +14,7 @@ export * from './proposal-preview.js' export * from './proposal.js' export * from './qa.js' export * from './skill.js' +export * from './source-unit-state.js' export * from './source.js' export * from './storage.js' export * from './user.js' diff --git a/packages/schema/src/source-unit-state.ts b/packages/schema/src/source-unit-state.ts new file mode 100644 index 0000000..bfc2ab4 --- /dev/null +++ b/packages/schema/src/source-unit-state.ts @@ -0,0 +1,70 @@ +import { z } from 'zod' +import { SkillRunId, SourceId, Timestamp, WorkspaceId } from './common.js' + +/** + * Stable fingerprint of a source unit's on-disk content. SHA-256 hex. + * Computed by the runtime at observation time; stored on each + * `SourceUnitState`. Branded so callers can't accidentally pass an + * arbitrary string. + */ +export const SourceUnitSha = z.string().regex(/^[a-f0-9]{64}$/i).brand<'SourceUnitSha'>() +export type SourceUnitSha = z.infer + +/** + * A source unit's identity and current on-disk fingerprint. The shape + * Reactor / BatchService / Studio hand around when asking "what's the + * current state of this unit on disk vs what we last observed". The + * trailing-slash convention from the intent scanner (folder = `name/`, + * file = `name`) is preserved verbatim so identity keys round-trip + * without normalisation surprises. + */ +export const SourceUnit = z.object({ + sourceId: SourceId, + path: z.string().min(1), + sha: SourceUnitSha, +}) +export type SourceUnit = z.infer + +/** + * Framework-level record of "we observed this source unit at this + * version, via this run, at this time". One entity per + * `(workspaceId, sourceId, path)`. Updated by `SourceUnitStateService` + * whenever an orchestrator (Batch, Reactor, manual dispatch) finishes a + * skill run that consumed the unit. + * + * No lifecycle states. Existence + current sha is all the state there + * is. Removed source units leave their entry behind as an audit trace; + * a later garbage-collection step may prune them. + * + * Deliberately ontology-neutral: this is *not* "intent extraction + * state". Any source role (intent or code) and any consuming skill + * (extract, model, future) can record observations here. + */ +export const SourceUnitState = z.object({ + workspaceId: WorkspaceId, + sourceId: SourceId, + path: z.string().min(1), + lastObservedSha: SourceUnitSha, + lastObservedAt: Timestamp, + lastObservedByRunId: SkillRunId.optional(), +}) +export type SourceUnitState = z.infer + +/** + * Result of comparing the on-disk units against the recorded + * `SourceUnitState`s. Reactor consumes this directly to decide what to + * re-extract. + * + * - `new`: unit not in state store + * - `changed`: in state store, sha differs + * - `unchanged`: in state store, sha matches (skip) + * - `orphaned`: state entry whose unit is no longer on disk (kept for + * audit; not deleted in v0) + */ +export const SourceUnitDiff = z.object({ + new: z.array(SourceUnit), + changed: z.array(SourceUnit), + unchanged: z.array(SourceUnit), + orphaned: z.array(SourceUnitState), +}) +export type SourceUnitDiff = z.infer diff --git a/packages/sdk/src/defineOntology.ts b/packages/sdk/src/defineOntology.ts index 503e52e..5562a43 100644 --- a/packages/sdk/src/defineOntology.ts +++ b/packages/sdk/src/defineOntology.ts @@ -1,6 +1,7 @@ import type { EdgeTypeDescriptor, NodeTypeDescriptor, + OntologyBatchBinding, OntologyPlugin, OntologyValidator, } from '@braidhq/core' @@ -44,6 +45,13 @@ export interface DefineOntologyInput { readonly configSchema?: z.ZodTypeAny /** Optional explicit plugin id; defaults to `ontology.`. */ readonly pluginId?: string + /** + * Optional batch / reactor binding. Declare which skill processes a + * single intent unit, the (optional) checkpoint configuration, and + * the (optional) derive-units skill. Without this binding the + * workspace cannot start a batch under this ontology. + */ + readonly batch?: OntologyBatchBinding } /** @@ -106,6 +114,7 @@ export function defineOntology(input: DefineOntologyInput): OntologyPlugin { skills: input.skills ?? [], referenceDirs: input.referenceDirs ?? [], validators: [], + ...(input.batch ? { batch: input.batch } : {}), } const validators: OntologyValidator[] = [ diff --git a/packages/server/src/app.ts b/packages/server/src/app.ts index 4ca613d..00f58f7 100644 --- a/packages/server/src/app.ts +++ b/packages/server/src/app.ts @@ -22,6 +22,7 @@ import { createProposalsRouter } from './routes/proposals.js' import { createRunsRouter } from './routes/runs.js' import { createSkillInputOptionsRouter } from './routes/skillInputOptions.js' import { createSkillsRouter } from './routes/skills.js' +import { createSourceUnitStatesRouter } from './routes/sourceUnitStates.js' import { createUsersRouter } from './routes/users.js' import { createWorkspaceEventsRouter } from './routes/workspaceEvents.js' import { createTransferOwnershipRouter, createWorkspaceMembersRouter } from './routes/workspaceMembers.js' @@ -140,6 +141,9 @@ export function createApp(deps: AppDependencies, options: AppOptions = {}): Open decisionRepository: deps.decisionRepository, })) workspaceScoped.route('/decisions', createDecisionsRouter({ decisionRepository: deps.decisionRepository })) + workspaceScoped.route('/source-unit-states', createSourceUnitStatesRouter({ + sourceUnitStateService: deps.sourceUnitStateService, + })) workspaceScoped.route('/ontology', createOntologyRouter({ workspaceRepository: deps.workspaceRepository, pluginRegistry: deps.pluginRegistry, diff --git a/packages/server/src/composeFs.ts b/packages/server/src/composeFs.ts index 5e5136f..9213ee0 100644 --- a/packages/server/src/composeFs.ts +++ b/packages/server/src/composeFs.ts @@ -1,5 +1,5 @@ import type { AgentPlugin, OntologyPlugin, SourceLoaderPlugin, StoragePlugin } from '@braidhq/core' -import type { AbsolutePath, AgentEffort, AgentId, AgentKind, StorageKind, WorkspaceId } from '@braidhq/schema' +import type { AbsolutePath, AgentEffort, StorageKind, WorkspaceId } from '@braidhq/schema' import type { AppDependencies } from './composition.js' import { spawn } from 'node:child_process' import { homedir } from 'node:os' @@ -17,7 +17,7 @@ import { WorkspaceBootstrap, } from '@braidhq/core' import { dddOntology } from '@braidhq/ontology-ddd' -import { StorageKind as StorageKindSchema } from '@braidhq/schema' +import { AgentId, AgentKind, StorageKind as StorageKindSchema } from '@braidhq/schema' import { GoogleDriveLoader } from '@braidhq/source-loader-gdrive' import { GitLoader } from '@braidhq/source-loader-git' import { kuzuStoragePlugin } from '@braidhq/storage-kuzu' @@ -34,6 +34,8 @@ import { FsGraphSerializer } from './infrastructure/fs/FsGraphSerializer.js' import { FsProposalRepository } from './infrastructure/fs/FsProposalRepository.js' import { FsRunRepository } from './infrastructure/fs/FsRunRepository.js' import { FsSkillRegistry } from './infrastructure/fs/FsSkillRegistry.js' +import { FsSourceUnitDigest } from './infrastructure/fs/FsSourceUnitDigest.js' +import { FsSourceUnitStateRepository } from './infrastructure/fs/FsSourceUnitStateRepository.js' import { FsWorkspaceRepository } from './infrastructure/fs/FsWorkspaceRepository.js' import { listIntentItems } from './infrastructure/fs/intentScan.js' import { discoverCanonicalWorkspaces } from './infrastructure/fs/WorkspaceDiscovery.js' @@ -237,9 +239,9 @@ export async function composeFsApp(options: ComposeFsOptions = {}): Promise { + throw new Error( + 'SourceUnitDigest is not wired. Pass `sourceUnitDigest` to composeApp() (composeFsApp does this automatically).', + ) + } +} diff --git a/packages/server/src/infrastructure/agent/SubprocessSkillRunner.ts b/packages/server/src/infrastructure/agent/SubprocessSkillRunner.ts index dedc0d3..a34f6a3 100644 --- a/packages/server/src/infrastructure/agent/SubprocessSkillRunner.ts +++ b/packages/server/src/infrastructure/agent/SubprocessSkillRunner.ts @@ -9,16 +9,12 @@ import type { Workspace, WorkspaceEventBus, } from '@braidhq/core' -import type { AbsolutePath, McpServerId, RunRecord, SkillEvent, SkillId, SkillRunId } from '@braidhq/schema' +import type { AbsolutePath, RunRecord, SkillEvent, SkillId, SkillRunId } from '@braidhq/schema' import type { ChildProcess, SpawnOptions } from 'node:child_process' import { mkdir, rm, symlink } from 'node:fs/promises' import { dirname, join } from 'node:path' import { newSkillRunId, NotFoundError } from '@braidhq/core' -import { - AbsolutePath as AbsolutePathSchema, - SkillEvent as SkillEventSchema, - SkillRunId as SkillRunIdSchema, -} from '@braidhq/schema' +import { AbsolutePath as AbsolutePathSchema, McpServerId, SkillEvent as SkillEventSchema, SkillRunId as SkillRunIdSchema } from '@braidhq/schema' import { sessionDirPath } from '../fs/paths.js' import { createAsyncQueue } from './asyncQueue.js' import { writeMcpConfigFile } from './mcpConfig.js' @@ -101,21 +97,28 @@ export class SubprocessSkillRunner implements SkillRunner { const manifest = await this.deps.skillRegistry.get(workspace, skillId) const runId = newSkillRunId() const sessionDir = await this.resolveSessionDir(workspace, runId, options?.resumeSessionId) + const gatewayArgs = [ + 'openapi-mcp-gateway', + '--spec', + this.deps.coreGateway?.specUrl ?? '', + '--transport', + 'stdio', + '--name', + 'braid-core', + // Forward the caller's Bearer token so the gateway authenticates + // its outgoing API calls. The gateway resolves `${BRAID_TOKEN}` + // against its inherited process env at startup; without this the + // server's auth middleware rejects every callback with 401. + // eslint-disable-next-line no-template-curly-in-string + ...(options?.callerToken ? ['--auth-type', 'bearer', '--auth-token', '${BRAID_TOKEN}'] : []), + ] const mcpConfigFile = await writeMcpConfigFile(workspace, sessionDir, { extraServers: this.deps.coreGateway ? [{ - id: 'braid-core' as McpServerId, + id: McpServerId.parse('braid-core'), transport: 'stdio', command: this.deps.coreGateway.uvxBin ?? 'uvx', - args: [ - 'openapi-mcp-gateway', - '--spec', - this.deps.coreGateway.specUrl, - '--transport', - 'stdio', - '--name', - 'braid-core', - ], + args: gatewayArgs, }] : [], }) @@ -135,7 +138,15 @@ export class SubprocessSkillRunner implements SkillRunner { // otherwise guess wrong about which one `.claude/skills/...` is rooted in. const child = spawnFn(invocation.bin, [...invocation.args], { cwd: sessionDir, - env: { ...invocation.env, BRAID_SESSION_DIR: sessionDir }, + env: { + ...invocation.env, + BRAID_SESSION_DIR: sessionDir, + // BRAID_TOKEN is read by the braid-core MCP gateway and by any + // shell-level callback (curl in a SKILL.md) so the subprocess + // can authenticate against the running server. + ...(options?.callerToken ? { BRAID_TOKEN: options.callerToken } : {}), + ...(options?.extraEnv ?? {}), + }, stdio: ['ignore', 'pipe', 'pipe'], }) this.running.set(runId, { workspace, child }) diff --git a/packages/server/src/infrastructure/fs/FsSourceUnitDigest.ts b/packages/server/src/infrastructure/fs/FsSourceUnitDigest.ts new file mode 100644 index 0000000..b5a316c --- /dev/null +++ b/packages/server/src/infrastructure/fs/FsSourceUnitDigest.ts @@ -0,0 +1,65 @@ +import type { SourceUnitDigest, Workspace } from '@braidhq/core' +import type { SourceId, SourceUnitSha } from '@braidhq/schema' +import { createHash } from 'node:crypto' +import { readdir, readFile, stat } from 'node:fs/promises' +import { isAbsolute, join, relative } from 'node:path' + +/** + * Filesystem-backed `SourceUnitDigest`. Resolves the unit's relative + * path under its filesystem source root, then either: + * + * - regular file: sha256 of file bytes + * - directory: recursive walk; hash each file by content; build a + * deterministic `:\n` manifest sorted by relpath; + * sha256 the manifest. Dot-prefixed entries are skipped (mirrors the + * intent scanner's convention). + * + * Trailing-slash convention from the intent scanner (file = `foo.md`, + * folder = `foo/`) is accepted. + */ +export class FsSourceUnitDigest implements SourceUnitDigest { + async computeSha(workspace: Workspace, sourceId: SourceId, path: string): Promise { + const source = workspace.sources.find(s => s.id === sourceId) + if (!source || source.kind !== 'filesystem') { + throw new Error( + `Cannot compute digest for non-filesystem source "${sourceId}" on workspace "${workspace.id}"`, + ) + } + const sourceRoot = isAbsolute(source.path) ? source.path : join(workspace.rootPath, source.path) + const stripped = path.endsWith('/') ? path.slice(0, -1) : path + const target = join(sourceRoot, stripped) + const info = await stat(target) + if (info.isDirectory()) + return hashDirectory(target) as Promise + return hashFile(target) as Promise + } +} + +async function hashFile(filePath: string): Promise { + const buf = await readFile(filePath) + return createHash('sha256').update(buf).digest('hex') +} + +async function hashDirectory(dirPath: string): Promise { + const files: Array<{ rel: string, hash: string }> = [] + await walk(dirPath, dirPath, files) + files.sort((a, b) => (a.rel < b.rel ? -1 : a.rel > b.rel ? 1 : 0)) + const manifest = files.map(f => `${f.rel}:${f.hash}`).join('\n') + return createHash('sha256').update(manifest).digest('hex') +} + +async function walk(root: string, dir: string, out: Array<{ rel: string, hash: string }>): Promise { + const entries = await readdir(dir, { withFileTypes: true }) + for (const entry of entries) { + if (entry.name.startsWith('.')) + continue + const abs = join(dir, entry.name) + if (entry.isDirectory()) { + await walk(root, abs, out) + continue + } + if (!entry.isFile()) + continue + out.push({ rel: relative(root, abs), hash: await hashFile(abs) }) + } +} diff --git a/packages/server/src/infrastructure/fs/FsSourceUnitStateRepository.ts b/packages/server/src/infrastructure/fs/FsSourceUnitStateRepository.ts new file mode 100644 index 0000000..cf484a4 --- /dev/null +++ b/packages/server/src/infrastructure/fs/FsSourceUnitStateRepository.ts @@ -0,0 +1,104 @@ +import type { SourceUnitStateRepository } from '@braidhq/core' +import type { AbsolutePath, SourceId, SourceUnitState, WorkspaceId } from '@braidhq/schema' +import type { Dirent } from 'node:fs' +import { mkdir, readdir, readFile, rename, writeFile } from 'node:fs/promises' +import { join } from 'node:path' +import process from 'node:process' +import { NotFoundError } from '@braidhq/core' +import { SourceUnitState as SourceUnitStateSchema } from '@braidhq/schema' +import { sourceUnitStateDir, sourceUnitStateFilePath, sourceUnitStateSourceDir } from './paths.js' + +export interface FsSourceUnitStateRepositoryOptions { + /** + * Lookup of `workspaceId → workspaceRoot`. Shared with the other Fs + * repositories so this repo doesn't introduce a separate dependency + * on `WorkspaceService`. `composeFs.ts` builds a single closure once + * and passes it to every fs repo. + */ + readonly workspaceRoots: () => Promise> +} + +/** + * Filesystem-backed `SourceUnitStateRepository`. One JSON file per + * entity at `artifacts/source-unit-state//.json`, + * matching the file-per-entity pattern used by proposals / clarify. + * + * The file body is exactly the `SourceUnitState` shape; no wrapper or + * envelope. A future SQLite or Postgres impl maps each file to one row + * with composite PK `(workspaceId, sourceId, path)`. + */ +export class FsSourceUnitStateRepository implements SourceUnitStateRepository { + constructor(private readonly options: FsSourceUnitStateRepositoryOptions) {} + + async find(workspaceId: WorkspaceId, sourceId: SourceId, path: string): Promise { + const root = await this.resolveRoot(workspaceId) + const file = sourceUnitStateFilePath(root, sourceId, path) + try { + const raw = await readFile(file, 'utf-8') + return SourceUnitStateSchema.parse(JSON.parse(raw)) + } + catch (error) { + if ((error as NodeJS.ErrnoException).code === 'ENOENT') + return null + throw error + } + } + + async save(state: SourceUnitState): Promise { + const root = await this.resolveRoot(state.workspaceId) + const file = sourceUnitStateFilePath(root, state.sourceId, state.path) + await mkdir(sourceUnitStateSourceDir(root, state.sourceId), { recursive: true }) + const tmp = `${file}.tmp-${process.pid}-${Date.now()}` + await writeFile(tmp, `${JSON.stringify(state, null, 2)}\n`, 'utf-8') + await rename(tmp, file) + } + + async listByWorkspace(workspaceId: WorkspaceId): Promise { + const root = await this.resolveRoot(workspaceId) + const baseDir = sourceUnitStateDir(root) + const sourceDirs = await safeReaddir(baseDir) + const all: SourceUnitState[] = [] + for (const sourceDir of sourceDirs) { + if (!sourceDir.isDirectory()) + continue + all.push(...await readEntriesIn(join(baseDir, sourceDir.name))) + } + return all + } + + async listBySource(workspaceId: WorkspaceId, sourceId: SourceId): Promise { + const root = await this.resolveRoot(workspaceId) + return readEntriesIn(sourceUnitStateSourceDir(root, sourceId)) + } + + private async resolveRoot(workspaceId: WorkspaceId): Promise { + const roots = await this.options.workspaceRoots() + const root = roots.get(workspaceId) + if (!root) + throw new NotFoundError(`Workspace "${workspaceId}" has no registered root path`) + return root + } +} + +async function safeReaddir(dir: string): Promise { + try { + return await readdir(dir, { withFileTypes: true }) + } + catch (error) { + if ((error as NodeJS.ErrnoException).code === 'ENOENT') + return [] + throw error + } +} + +async function readEntriesIn(dir: string): Promise { + const entries = await safeReaddir(dir) + const results: SourceUnitState[] = [] + for (const entry of entries) { + if (!entry.isFile() || !entry.name.endsWith('.json')) + continue + const raw = await readFile(join(dir, entry.name), 'utf-8') + results.push(SourceUnitStateSchema.parse(JSON.parse(raw))) + } + return results +} diff --git a/packages/server/src/infrastructure/fs/FsWorkspaceRepository.ts b/packages/server/src/infrastructure/fs/FsWorkspaceRepository.ts index 140f9f9..7aa732b 100644 --- a/packages/server/src/infrastructure/fs/FsWorkspaceRepository.ts +++ b/packages/server/src/infrastructure/fs/FsWorkspaceRepository.ts @@ -2,11 +2,13 @@ import type { Workspace as WorkspaceData } from '@braidhq/schema' import type { WorkspaceRegistryFile } from './WorkspaceRegistryFile.js' import { readFile, stat } from 'node:fs/promises' import { resolve } from 'node:path' -import { NotFoundError, ValidationError, Workspace, type WorkspaceRepository } from '@braidhq/core' +import { createLogger, NotFoundError, ValidationError, Workspace, type WorkspaceRepository } from '@braidhq/core' import { AbsolutePath, ProductManifest, WorkspaceId } from '@braidhq/schema' import { parseMarkdownFrontmatter } from './frontmatter.js' import { workspaceProductManifestPath } from './paths.js' +const log = createLogger('server').child({ mod: 'workspace-repo' }) + export interface FsWorkspaceRepositoryDeps { readonly registry: WorkspaceRegistryFile } @@ -16,11 +18,20 @@ export class FsWorkspaceRepository implements WorkspaceRepository { constructor(private readonly deps: FsWorkspaceRepositoryDeps) {} + // One unreadable rootPath (deleted dir, broken PRODUCT.md, ...) + // must not starve the rest. The asymmetry with `load()` is intentional: + // aggregate views recover, single-entity lookups distinguish present + // from absent. async list(): Promise { const rootPaths = await this.deps.registry.list() const workspaces: Workspace[] = [] for (const rootPath of rootPaths) { - workspaces.push(await this.load(rootPath)) + try { + workspaces.push(await this.load(rootPath)) + } + catch (err) { + log.warn({ err, rootPath }, 'skipping unreadable registry entry') + } } return workspaces } diff --git a/packages/server/src/infrastructure/fs/paths.ts b/packages/server/src/infrastructure/fs/paths.ts index bd0f3ac..09f64a7 100644 --- a/packages/server/src/infrastructure/fs/paths.ts +++ b/packages/server/src/infrastructure/fs/paths.ts @@ -38,6 +38,30 @@ export function batchPlanPath(workspaceRoot: AbsolutePath): string { return join(workspaceArtifactsDir(workspaceRoot), 'batch-plan.json') } +export function sourceUnitStateDir(workspaceRoot: AbsolutePath): string { + return join(workspaceArtifactsDir(workspaceRoot), 'source-unit-state') +} + +export function sourceUnitStateFilePath( + workspaceRoot: AbsolutePath, + sourceId: string, + relativePath: string, +): string { + // Drop trailing slash from folder units so file name doesn't end in + // `/.json`. Replace path separators inside the unit name with `__` + // so a multi-level folder unit still maps to one file. + const trimmed = relativePath.endsWith('/') ? relativePath.slice(0, -1) : relativePath + const flattened = trimmed.replace(/\//g, '__') + return join(sourceUnitStateDir(workspaceRoot), sourceId, `${flattened}.json`) +} + +export function sourceUnitStateSourceDir( + workspaceRoot: AbsolutePath, + sourceId: string, +): string { + return join(sourceUnitStateDir(workspaceRoot), sourceId) +} + export function proposalsDir(workspaceRoot: AbsolutePath, status: ProposalStatus): string { return join(workspaceArtifactsDir(workspaceRoot), 'proposals', status) } diff --git a/packages/server/src/middleware/auth.ts b/packages/server/src/middleware/auth.ts index ea0d91e..707d56c 100644 --- a/packages/server/src/middleware/auth.ts +++ b/packages/server/src/middleware/auth.ts @@ -10,7 +10,12 @@ import { parseBoolEnv } from '../infrastructure/env.js' * Path prefixes (not exact match) so `/auth/google/start`, * `/auth/google/callback`, `/auth/whoami` all flow through without a * token. `/health` stays open so platform probes don't need a token. + * `/openapi.json` matches the broad industry convention that OpenAPI + * descriptions are publicly readable (Swagger UI, codegen, MCP gateways + * default to fetching them without auth). It only documents shape, not + * data, so leaking it to anonymous callers is safe. */ +const PUBLIC_EXACT = new Set(['/openapi.json']) const PUBLIC_PREFIXES = ['/auth/', '/health'] export interface AuthMiddlewareOptions { @@ -29,11 +34,32 @@ export interface AuthMiddlewareOptions { readonly localTrust?: boolean } +// EventSource (browser SSE) cannot send custom Authorization headers, so +// for live-update endpoints we additionally accept a `?token=...` query +// parameter and treat it identically to a Bearer token. Limited to paths +// ending in `/events` so this lenient path doesn't apply to mutating routes. +function isSseEventsPath(path: string): boolean { + return path.endsWith('/events') +} + +/** + * Pull a Bearer token off the request, or return `undefined` if the + * header is missing / malformed. Route handlers use this to forward the + * caller's identity to spawned skill subprocesses without re-parsing. + */ +export function extractBearerToken(context: { req: { header: (name: string) => string | undefined } }): string | undefined { + const header = context.req.header('Authorization') + if (!header || !header.startsWith('Bearer ')) + return undefined + const token = header.slice('Bearer '.length).trim() + return token.length > 0 ? token : undefined +} + export function authMiddleware(options: AuthMiddlewareOptions): MiddlewareHandler { const localTrust = options.localTrust ?? parseBoolEnv(process.env.BRAID_LOCAL_TRUST, false) return async (context, next) => { const path = context.req.path - if (PUBLIC_PREFIXES.some(prefix => path.startsWith(prefix))) { + if (PUBLIC_EXACT.has(path) || PUBLIC_PREFIXES.some(prefix => path.startsWith(prefix))) { await next() return undefined } @@ -45,7 +71,13 @@ export function authMiddleware(options: AuthMiddlewareOptions): MiddlewareHandle return undefined } const header = context.req.header('Authorization') - const token = header?.startsWith('Bearer ') ? header.slice('Bearer '.length).trim() : null + let token = header?.startsWith('Bearer ') ? header.slice('Bearer '.length).trim() : null + // Fall back to a query-param token for SSE endpoints only. + if (!token && isSseEventsPath(path)) { + const queryToken = context.req.query('token') + if (queryToken) + token = queryToken + } if (!token) { return context.json( { diff --git a/packages/server/src/routes/batch.ts b/packages/server/src/routes/batch.ts index 8d2a306..26c37b5 100644 --- a/packages/server/src/routes/batch.ts +++ b/packages/server/src/routes/batch.ts @@ -3,6 +3,7 @@ import { NotFoundError } from '@braidhq/core' import { zValidator } from '@hono/zod-validator' import { Hono } from 'hono' import { z } from 'zod' +import { extractBearerToken } from '../middleware/auth.js' import { getWorkspaceId } from '../middleware/workspaceId.js' const StartBody = z.object({ @@ -19,7 +20,11 @@ export function createBatchRouter(deps: BatchRouterDeps): Hono { router.post('/', zValidator('json', StartBody), async (context) => { const workspaceId = getWorkspaceId(context) const { autoApply } = context.req.valid('json') - const plan = await deps.batchService.start(workspaceId, { autoApply }) + const callerToken = extractBearerToken(context) + const plan = await deps.batchService.start(workspaceId, { + autoApply, + ...(callerToken ? { callerToken } : {}), + }) return context.json(plan.toData(), 202) }) @@ -39,7 +44,8 @@ export function createBatchRouter(deps: BatchRouterDeps): Hono { router.post('/resume', async (context) => { const workspaceId = getWorkspaceId(context) - const plan = await deps.batchService.resume(workspaceId) + const callerToken = extractBearerToken(context) + const plan = await deps.batchService.resume(workspaceId, callerToken ? { callerToken } : {}) return context.json(plan.toData(), 202) }) diff --git a/packages/server/src/routes/skills.ts b/packages/server/src/routes/skills.ts index 8c0d93c..480aea0 100644 --- a/packages/server/src/routes/skills.ts +++ b/packages/server/src/routes/skills.ts @@ -3,9 +3,9 @@ import type { SkillRunner, WorkspaceRepository, } from '@braidhq/core' -import type { SkillId } from '@braidhq/schema' import { SkillId as SkillIdSchema, SkillManifest, SkillRunId } from '@braidhq/schema' import { createRoute, OpenAPIHono, z } from '@hono/zod-openapi' +import { extractBearerToken } from '../middleware/auth.js' import { requirePermission } from '../middleware/workspaceAccess.js' import { getWorkspaceId } from '../middleware/workspaceId.js' import { NotFoundResponse, WorkspaceIdParam } from './_shared.js' @@ -98,7 +98,7 @@ export function createSkillsRouter(deps: SkillsRouterDeps): OpenAPIHono { // manifest once and hands the result to the policy check. router.use('/:skillId/run', requirePermission('skill.run', async (context) => { const workspace = await loadWorkspaceById(getWorkspaceId(context), deps.workspaceRepository) - const skillId = context.req.param('skillId') as SkillId + const skillId = SkillIdSchema.parse(context.req.param('skillId')) const manifest = await deps.skillRegistry.get(workspace, skillId) return { skill: manifest.toData().frontmatter, skillId } })) @@ -122,8 +122,12 @@ export function createSkillsRouter(deps: SkillsRouterDeps): OpenAPIHono { const workspace = await loadWorkspaceById(getWorkspaceId(context), deps.workspaceRepository) const { skillId } = context.req.valid('param') const { args, resumeSessionId } = context.req.valid('json') - const options = resumeSessionId ? { resumeSessionId } : undefined - const runId = await deps.skillRunner.start(workspace, skillId as SkillId, args, options) + const callerToken = extractBearerToken(context) + const options = { + ...(resumeSessionId ? { resumeSessionId } : {}), + ...(callerToken ? { callerToken } : {}), + } + const runId = await deps.skillRunner.start(workspace, skillId, args, options) return context.json({ runId }, 202) }) diff --git a/packages/server/src/routes/sourceUnitStates.ts b/packages/server/src/routes/sourceUnitStates.ts new file mode 100644 index 0000000..36b6692 --- /dev/null +++ b/packages/server/src/routes/sourceUnitStates.ts @@ -0,0 +1,55 @@ +import type { SourceUnitStateService } from '@braidhq/core' +import { SourceId, SourceUnitState } from '@braidhq/schema' +import { createRoute, OpenAPIHono, z } from '@hono/zod-openapi' +import { getWorkspaceId } from '../middleware/workspaceId.js' +import { ValidationFailureResponse, WorkspaceIdParam } from './_shared.js' + +const ListQuery = z.object({ + sourceId: SourceId.optional().openapi({ description: 'Restrict to one source.' }), +}) + +const ListResponse = z.object({ + items: z.array(SourceUnitState), +}).openapi('SourceUnitStateListResponse') + +export interface SourceUnitStatesRouterDeps { + sourceUnitStateService: SourceUnitStateService +} + +const listRoute = createRoute({ + method: 'get', + path: '/', + operationId: 'listSourceUnitStates', + summary: 'List recorded observations per source unit for a workspace.', + description: 'Returns the framework\'s current view of each source unit ' + + 'last seen by an extract run. One entry per (sourceId, path). Used ' + + 'by Studio to display per-unit freshness and by Reactor to compute ' + + 'what needs re-extraction.', + tags: ['source-unit-states'], + request: { + params: WorkspaceIdParam, + query: ListQuery, + }, + responses: { + 200: { + description: 'The recorded observations.', + content: { 'application/json': { schema: ListResponse } }, + }, + 400: ValidationFailureResponse, + }, +}) + +export function createSourceUnitStatesRouter(deps: SourceUnitStatesRouterDeps): OpenAPIHono { + const router = new OpenAPIHono() + + router.openapi(listRoute, async (context) => { + const workspaceId = getWorkspaceId(context) + const { sourceId } = context.req.valid('query') + const items = sourceId + ? await deps.sourceUnitStateService.listBySource(workspaceId, sourceId) + : await deps.sourceUnitStateService.listByWorkspace(workspaceId) + return context.json({ items: [...items] }, 200) + }) + + return router +} diff --git a/packages/server/test/infrastructure/fs/FsBatchPlanRepository.test.ts b/packages/server/test/infrastructure/fs/FsBatchPlanRepository.test.ts index ef7ba6b..fe4690f 100644 --- a/packages/server/test/infrastructure/fs/FsBatchPlanRepository.test.ts +++ b/packages/server/test/infrastructure/fs/FsBatchPlanRepository.test.ts @@ -32,6 +32,7 @@ function makePlan(): BatchPlan { status: 'running', autoApply: true, units: [makeUnit('pu-a'), makeUnit('pu-b')], + checkpointPhases: [], }) } diff --git a/packages/server/test/infrastructure/fs/FsSourceUnitDigest.test.ts b/packages/server/test/infrastructure/fs/FsSourceUnitDigest.test.ts new file mode 100644 index 0000000..b656970 --- /dev/null +++ b/packages/server/test/infrastructure/fs/FsSourceUnitDigest.test.ts @@ -0,0 +1,89 @@ +import type { AbsolutePath, SourceId } from '@braidhq/schema' +import { mkdir, mkdtemp, writeFile } from 'node:fs/promises' +import { tmpdir } from 'node:os' +import { join } from 'node:path' +import { describe, expect, it } from 'vitest' +import { FsSourceUnitDigest } from '../../../src/infrastructure/fs/FsSourceUnitDigest.js' +import { makeWorkspace } from '../../helpers/fakes.js' + +async function makeRoot(): Promise { + return (await mkdtemp(join(tmpdir(), 'braid-source-digest-'))) as AbsolutePath +} + +function intentSource(id: string, path: AbsolutePath) { + return { + kind: 'filesystem' as const, + id: id as SourceId, + role: 'intent' as const, + name: id, + path, + } +} + +describe('FsSourceUnitDigest', () => { + it('hashes a single file by content; identical content yields identical sha', async () => { + const sourcePath = await makeRoot() + await writeFile(join(sourcePath, 'a.md'), 'hello braid\n', 'utf-8') + const ws = makeWorkspace({ rootPath: sourcePath, sources: [intentSource('intent', sourcePath)] }) + const digest = new FsSourceUnitDigest() + + const sha1 = await digest.computeSha(ws, 'intent' as SourceId, 'a.md') + const sha2 = await digest.computeSha(ws, 'intent' as SourceId, 'a.md') + expect(sha1).toMatch(/^[a-f0-9]{64}$/) + expect(sha1).toBe(sha2) + }) + + it('different content yields different sha', async () => { + const sourcePath = await makeRoot() + const ws = makeWorkspace({ rootPath: sourcePath, sources: [intentSource('intent', sourcePath)] }) + const digest = new FsSourceUnitDigest() + + await writeFile(join(sourcePath, 'a.md'), 'first\n', 'utf-8') + const before = await digest.computeSha(ws, 'intent' as SourceId, 'a.md') + await writeFile(join(sourcePath, 'a.md'), 'second\n', 'utf-8') + const after = await digest.computeSha(ws, 'intent' as SourceId, 'a.md') + expect(before).not.toBe(after) + }) + + it('hashes a folder by walking its files; tolerates trailing slash on path', async () => { + const sourcePath = await makeRoot() + await mkdir(join(sourcePath, 'feature'), { recursive: true }) + await writeFile(join(sourcePath, 'feature', 'a.md'), 'A\n', 'utf-8') + await writeFile(join(sourcePath, 'feature', 'b.md'), 'B\n', 'utf-8') + + const ws = makeWorkspace({ rootPath: sourcePath, sources: [intentSource('intent', sourcePath)] }) + const digest = new FsSourceUnitDigest() + + const withSlash = await digest.computeSha(ws, 'intent' as SourceId, 'feature/') + const withoutSlash = await digest.computeSha(ws, 'intent' as SourceId, 'feature') + expect(withSlash).toBe(withoutSlash) + expect(withSlash).toMatch(/^[a-f0-9]{64}$/) + }) + + it('folder sha changes when any file inside changes', async () => { + const sourcePath = await makeRoot() + await mkdir(join(sourcePath, 'feature'), { recursive: true }) + await writeFile(join(sourcePath, 'feature', 'a.md'), 'A\n', 'utf-8') + const ws = makeWorkspace({ rootPath: sourcePath, sources: [intentSource('intent', sourcePath)] }) + const digest = new FsSourceUnitDigest() + + const before = await digest.computeSha(ws, 'intent' as SourceId, 'feature/') + await writeFile(join(sourcePath, 'feature', 'a.md'), 'A changed\n', 'utf-8') + const after = await digest.computeSha(ws, 'intent' as SourceId, 'feature/') + expect(before).not.toBe(after) + }) + + it('throws on non-filesystem source', async () => { + const ws = makeWorkspace({ + sources: [{ + kind: 'mcp', + id: 'mcp-src' as SourceId, + role: 'intent', + name: 'mcp', + mcpServerId: 'srv' as never, + }], + }) + const digest = new FsSourceUnitDigest() + await expect(digest.computeSha(ws, 'mcp-src' as SourceId, 'whatever')).rejects.toThrow() + }) +}) diff --git a/packages/server/test/infrastructure/fs/FsSourceUnitStateRepository.test.ts b/packages/server/test/infrastructure/fs/FsSourceUnitStateRepository.test.ts new file mode 100644 index 0000000..41eac01 --- /dev/null +++ b/packages/server/test/infrastructure/fs/FsSourceUnitStateRepository.test.ts @@ -0,0 +1,90 @@ +import type { AbsolutePath, SkillRunId, SourceId, SourceUnitSha, SourceUnitState, Timestamp, WorkspaceId } from '@braidhq/schema' +import { mkdtemp } from 'node:fs/promises' +import { tmpdir } from 'node:os' +import { join } from 'node:path' +import { describe, expect, it } from 'vitest' +import { FsSourceUnitStateRepository } from '../../../src/infrastructure/fs/FsSourceUnitStateRepository.js' + +async function makeRoot(): Promise { + return (await mkdtemp(join(tmpdir(), 'braid-source-unit-state-'))) as AbsolutePath +} + +function sha(byte: string): SourceUnitSha { + return byte.repeat(64) as SourceUnitSha +} + +function makeState(workspaceId: WorkspaceId, sourceId: string, path: string, hex: string): SourceUnitState { + return { + workspaceId, + sourceId: sourceId as SourceId, + path, + lastObservedSha: sha(hex), + lastObservedAt: '2026-06-08T00:00:00.000Z' as Timestamp, + lastObservedByRunId: 'run-1' as SkillRunId, + } +} + +function workspaceRootsClosure(root: AbsolutePath, workspaceId: WorkspaceId) { + return async () => new Map([[workspaceId, root]]) +} + +describe('FsSourceUnitStateRepository', () => { + it('find returns null before any write', async () => { + const root = await makeRoot() + const wsId = 'ws-1' as WorkspaceId + const repo = new FsSourceUnitStateRepository({ workspaceRoots: workspaceRootsClosure(root, wsId) }) + expect(await repo.find(wsId, 'src' as SourceId, 'foo.md')).toBeNull() + }) + + it('save then find round-trips one entry', async () => { + const root = await makeRoot() + const wsId = 'ws-1' as WorkspaceId + const repo = new FsSourceUnitStateRepository({ workspaceRoots: workspaceRootsClosure(root, wsId) }) + await repo.save(makeState(wsId, 'src', 'foo.md', 'a')) + const found = await repo.find(wsId, 'src' as SourceId, 'foo.md') + expect(found?.lastObservedSha).toBe(sha('a')) + }) + + it('save overwrites the existing entry for the same key', async () => { + const root = await makeRoot() + const wsId = 'ws-1' as WorkspaceId + const repo = new FsSourceUnitStateRepository({ workspaceRoots: workspaceRootsClosure(root, wsId) }) + await repo.save(makeState(wsId, 'src', 'foo.md', 'a')) + await repo.save(makeState(wsId, 'src', 'foo.md', 'b')) + const found = await repo.find(wsId, 'src' as SourceId, 'foo.md') + expect(found?.lastObservedSha).toBe(sha('b')) + }) + + it('listByWorkspace returns all entries across all sources', async () => { + const root = await makeRoot() + const wsId = 'ws-1' as WorkspaceId + const repo = new FsSourceUnitStateRepository({ workspaceRoots: workspaceRootsClosure(root, wsId) }) + await repo.save(makeState(wsId, 'src-a', 'foo.md', '1')) + await repo.save(makeState(wsId, 'src-a', 'bar/', '2')) + await repo.save(makeState(wsId, 'src-b', 'baz.md', '3')) + + const all = await repo.listByWorkspace(wsId) + expect(all).toHaveLength(3) + }) + + it('listBySource filters to one source', async () => { + const root = await makeRoot() + const wsId = 'ws-1' as WorkspaceId + const repo = new FsSourceUnitStateRepository({ workspaceRoots: workspaceRootsClosure(root, wsId) }) + await repo.save(makeState(wsId, 'src-a', 'foo.md', '1')) + await repo.save(makeState(wsId, 'src-b', 'bar.md', '2')) + + const a = await repo.listBySource(wsId, 'src-a' as SourceId) + expect(a).toHaveLength(1) + expect(a[0]!.sourceId).toBe('src-a') + }) + + it('handles folder units with trailing slash in path', async () => { + const root = await makeRoot() + const wsId = 'ws-1' as WorkspaceId + const repo = new FsSourceUnitStateRepository({ workspaceRoots: workspaceRootsClosure(root, wsId) }) + await repo.save(makeState(wsId, 'src', 'feature/', 'a')) + const found = await repo.find(wsId, 'src' as SourceId, 'feature/') + expect(found?.path).toBe('feature/') + }) +}) diff --git a/packages/server/test/infrastructure/fs/FsWorkspaceRepository.test.ts b/packages/server/test/infrastructure/fs/FsWorkspaceRepository.test.ts index 6305932..5f60d90 100644 --- a/packages/server/test/infrastructure/fs/FsWorkspaceRepository.test.ts +++ b/packages/server/test/infrastructure/fs/FsWorkspaceRepository.test.ts @@ -78,6 +78,20 @@ describe('FsWorkspaceRepository', () => { await expect(repository.load(rootPath)).rejects.toThrow(NotFoundError) }) + it('list skips registry entries whose workspace directory is missing', async () => { + const liveRoot = AbsolutePath.parse(await createWorkspaceDir({ name: 'alive' })) + const ghostRoot = AbsolutePath.parse('/tmp/braid-ghost-workspace-does-not-exist') + const registry = await makeRegistry() + // Stamp the stale entry directly; load+save would throw before saving. + await registry.add(ghostRoot) + await registry.add(liveRoot) + + const repository = new FsWorkspaceRepository({ registry }) + const all = await repository.list() + + expect(all.map(w => w.productManifest.name)).toEqual(['alive']) + }) + it('throws ValidationError when frontmatter invalid', async () => { const dir = await mkdtemp(join(tmpdir(), 'braid-ws-')) await writeFile(join(dir, 'PRODUCT.md'), '---\nname: ""\n---\n', 'utf-8') diff --git a/packages/server/test/routes.test.ts b/packages/server/test/routes.test.ts index 6047cf0..3f5202a 100644 --- a/packages/server/test/routes.test.ts +++ b/packages/server/test/routes.test.ts @@ -413,6 +413,7 @@ describe('list endpoints return their empty shape for a fresh workspace', () => { path: `/workspaces/${workspaceId}/nodes`, empty: { items: [] } }, { path: `/workspaces/${workspaceId}/edges`, empty: { items: [] } }, { path: `/workspaces/${workspaceId}/decisions`, empty: { items: [] } }, + { path: `/workspaces/${workspaceId}/source-unit-states`, empty: { items: [] } }, ] as const it.each(cases)('GET $path returns 200 + $empty', async ({ path, empty }) => { diff --git a/packages/studio/src/App.tsx b/packages/studio/src/App.tsx index 01dad3f..1bd51b1 100644 --- a/packages/studio/src/App.tsx +++ b/packages/studio/src/App.tsx @@ -14,6 +14,7 @@ import { WorkspaceDetailsSheet } from './components/WorkspaceDetailsSheet' import { useBatchStatus, useWorkspaces } from './lib/queries' import { useAuthGate } from './lib/useAuthGate' import { GraphNavigationContext } from './lib/useGraphNavigation' +import { useResetOnRemoteChange } from './lib/useRemoteWorkspaces' import { TabNavigationContext } from './lib/useTabNavigation' import { readUrl, useUrlSync } from './lib/useUrlState' import { useWorkspaceEvents } from './lib/useWorkspaceEvents' @@ -44,6 +45,7 @@ function BootScreen() { } function AppInner() { + useResetOnRemoteChange() const { data: workspaces } = useWorkspaces() // Initial state is hydrated from the URL so refresh / deep links land back // on the same workspace + surface. @@ -74,7 +76,7 @@ function AppInner() { useWorkspaceEvents(activeId) const { data: activeBatchPlan } = useBatchStatus(activeId ?? undefined) - const hasActiveBatch = activeBatchPlan?.status === 'running' || activeBatchPlan?.status === 'scanning' + const hasActiveBatch = activeBatchPlan?.status === 'running' || activeBatchPlan?.status === 'deriving' const items = workspaces?.items ?? [] diff --git a/packages/studio/src/components/BatchInFlightBanner.tsx b/packages/studio/src/components/BatchInFlightBanner.tsx index fd6f260..b58707a 100644 --- a/packages/studio/src/components/BatchInFlightBanner.tsx +++ b/packages/studio/src/components/BatchInFlightBanner.tsx @@ -28,8 +28,8 @@ export function BatchInFlightBanner({ workspaceId, onOpenBatch, suppress }: Batc let mode: Mode if (plan.status === 'running') mode = { kind: 'active', label: 'Bootstrap Running', completed, total } - else if (plan.status === 'scanning') - mode = { kind: 'active', label: 'Scanning Codebase…', completed, total } + else if (plan.status === 'deriving') + mode = { kind: 'active', label: 'Deriving Units…', completed, total } else if ((plan.status === 'failed' || plan.status === 'stopped') && unfinished) mode = { kind: 'resumable', completed, total } else diff --git a/packages/studio/src/components/ListRow.tsx b/packages/studio/src/components/ListRow.tsx index 812b3dc..e8e8dc4 100644 --- a/packages/studio/src/components/ListRow.tsx +++ b/packages/studio/src/components/ListRow.tsx @@ -13,6 +13,14 @@ interface ListRowProps { className?: string /** Forwarded to the native `title` attribute; useful for tooltip text when the row is icon-only. */ title?: string | undefined + /** + * Optional left-edge identity stripe. Used by the multi-server sidebar + * to mark which remote a workspace belongs to. Sits inside the row's + * `
  • ` alongside the active indicator so server identity stays + * visible even when the row isn't selected. + */ + stripeClassName?: string + stripeDim?: boolean children: ReactNode } @@ -21,7 +29,7 @@ interface ListRowProps { * left edge. Used by every selectable list in Studio so the visual language * (hover transition, active bg, bar position) stays consistent. */ -export function ListRow({ active, onClick, variant = 'content', className, title, children }: ListRowProps) { +export function ListRow({ active, onClick, variant = 'content', className, title, stripeClassName, stripeDim, children }: ListRowProps) { const tokens = variant === 'sidebar' ? { bar: 'inset-y-1', @@ -40,8 +48,19 @@ export function ListRow({ active, onClick, variant = 'content', className, title } return (
  • + {stripeClassName && ( + + )} {active && ( - + )} - - )} -
      - {workspaces.length === 0 && !collapsed && ( -
    • No workspace yet.
    • - )} - {workspaces.map(ws => ( - onSelect(ws.id)} - {...(collapsed ? { title: ws.id, className: 'justify-center px-0 py-1' } : {})} - > - {collapsed - ? ( - - ) - : ( - <> - - {ws.id} - - - - )} - - ))} -
    - {collapsed && ( -
    - -
    - )} + {remoteResults.map(result => ( + + ))} {activeWorkspaceId && ( @@ -196,9 +184,6 @@ export function Sidebar({ ) : ( <> - {/* Empty flex spacer reserves the left of the utility row - for a future user / account avatar; without it the - icons would drift to centre when account is absent. */}
    @@ -218,6 +203,278 @@ export function Sidebar({ ) } +function RemoteSection({ + result, + collapsed, + activeWorkspaceId, + activeRemoteId, + onSelectWorkspace, + onOpenDetails, + onOpenAdd, + onSignIn, +}: { + result: RemoteWorkspacesResult + collapsed: boolean + activeWorkspaceId: string | null + activeRemoteId: string + onSelectWorkspace: (remote: RemoteSummary, workspaceId: string) => void + onOpenDetails: (workspaceId: string) => void + onOpenAdd: (remote: RemoteSummary) => void + onSignIn: (remote: RemoteSummary) => void +}) { + const { remote, state } = result + const isActiveRemote = remote.id === activeRemoteId + const stripe = remoteStripeClass(remote) + const Icon = remote.isLocal ? Laptop : Globe + + return ( +
    + {!collapsed && ( +
    +
    + + + {remote.name} + +
    + {state.kind === 'ok' && ( + + )} +
    + )} + + {collapsed && ( +
    + + +
    + + {remote.name} + +
    + )} + + +
    + ) +} + +function RemoteContent({ + state, + remote, + stripe, + collapsed, + isActiveRemote, + activeWorkspaceId, + onSelectWorkspace, + onOpenDetails, + onOpenAdd, + onSignIn, +}: { + state: RemoteWorkspacesResult['state'] + remote: RemoteSummary + stripe: string + collapsed: boolean + isActiveRemote: boolean + activeWorkspaceId: string | null + onSelectWorkspace: (remote: RemoteSummary, workspaceId: string) => void + onOpenDetails: (workspaceId: string) => void + onOpenAdd: (remote: RemoteSummary) => void + onSignIn: (remote: RemoteSummary) => void +}) { + if (state.kind === 'loading') { + if (collapsed) + return null + return ( +
    Loading…
    + ) + } + + if (state.kind === 'unauthenticated') { + if (collapsed) { + return ( +
    + + + + + {`Sign in to ${remote.name}`} + +
    + ) + } + return ( + + ) + } + + if (state.kind === 'error') { + if (collapsed) + return null + return ( +
    + Unreachable +
    + ) + } + + const workspaces = state.workspaces + if (workspaces.length === 0) { + if (collapsed) { + return ( +
    + + + + + {`Open workspace on ${remote.name}`} + +
    + ) + } + return ( +
    No workspace yet.
    + ) + } + + return ( +
      + {workspaces.map(ws => ( + onSelectWorkspace(remote, ws.id)} + onOpenDetails={() => onOpenDetails(ws.id)} + /> + ))} +
    + ) +} + +function WorkspaceRow({ + workspace, + remote, + stripe, + collapsed, + isActiveRemote, + active, + onClick, + onOpenDetails, +}: { + workspace: Workspace + remote: RemoteSummary + stripe: string + collapsed: boolean + isActiveRemote: boolean + active: boolean + onClick: () => void + onOpenDetails: () => void +}) { + // Inactive remotes get a dimmer stripe so the active server still + // reads first while server identity stays visible across all rows. + return ( + + {collapsed + ? ( + isActiveRemote + ? ( + + ) + : ( + + ) + ) + : ( + <> + + {workspace.id} + {isActiveRemote && } + { + e.stopPropagation() + onOpenDetails() + }} + onKeyDown={(e) => { + if (e.key === 'Enter' || e.key === ' ') { + e.stopPropagation() + onOpenDetails() + } + }} + className="ml-1 hidden rounded p-0.5 text-sidebar-foreground/40 hover:bg-sidebar-accent hover:text-sidebar-foreground group-hover:inline-flex" + title="Details" + > + ⋯ + + + )} + + ) +} + function SidebarIconButton({ onClick, title, children }: { onClick: () => void title: string @@ -248,25 +505,6 @@ function ThemeToggle() { ) } -function ActiveServerLabel({ collapsed }: { collapsed: boolean }) { - const remotes = useRemotes() - const activeId = useActiveRemoteId() - if (remotes.length === 0 || collapsed) - return null - const isLocal = activeId === LOCAL_REMOTE_ID - const activeName = isLocal - ? 'Local' - : remotes.find(r => r.id === activeId)?.name ?? 'Local' - const Icon = isLocal ? Laptop : Globe - return ( -
    - - Server: - {activeName} -
    - ) -} - function HereSection({ workspaceId, activeSurface, @@ -286,21 +524,11 @@ function HereSection({ const { data: skills } = useSkills(workspaceId) const pendingProposals = proposals?.items.length ?? 0 const pendingClarify = clarify?.items.length ?? 0 - // HITL tabs are hidden when the viewer can't act on them. The server - // 403-gates the same routes, so devtools tampering changes nothing. const canSeeProposals = policy.can('proposal.read') const canSeeClarify = policy.can('clarify.read') - // Actions tab is visible when the viewer can run at least one - // workspace skill. Owners/maintainers trivially can; guests get the - // tab only when a skill's allowedRoles or their per-member override - // gives them something runnable (e.g. braid-ask in the default - // catalog). const canRunActions = (skills?.items ?? []).some(s => !s.frontmatter.braid.hidden && policy.can('skill.run', { skill: s.frontmatter, skillId: s.id }), ) - // History page exposes Tag / Restore actions which are owner-only. - // Even though the server 403s, showing the buttons to guests is - // misleading; hide the whole tab for them. const canSeeHistory = policy.effectiveRole !== null && policy.effectiveRole !== 'guest' return ( @@ -424,9 +652,6 @@ function HereRow({ collapsed, icon: Icon, label, active, count = 0, shortcut, on ) : ( <> - {/* size-5 wrapper so the icon column matches the workspace - swatch column above (both 20px wide), keeping label - x-positions aligned across both sections. */}
    @@ -454,13 +679,6 @@ function HereRow({ collapsed, icon: Icon, label, active, count = 0, shortcut, on } function WorkspaceBadges({ workspaceId }: { workspaceId: string }) { - // For the active workspace, useWorkspaceEvents keeps these query keys - // live. For inactive workspaces the counts are slightly stale until - // the user opens it; acceptable cost to avoid N concurrent SSEs. - // Three kinds (in-flight runs, pending clarifies, pending proposals) - // aggregate into a single number; the active workspace's HERE - // section is where the per-surface breakdown lives. A tooltip keeps - // the breakdown a hover away for the inactive-workspace case. const { data: proposals } = usePendingProposals(workspaceId) const { data: clarify } = usePendingClarify(workspaceId) const { data: runs } = useRuns(workspaceId) diff --git a/packages/studio/src/lib/api.ts b/packages/studio/src/lib/api.ts index 2220994..a1134fc 100644 --- a/packages/studio/src/lib/api.ts +++ b/packages/studio/src/lib/api.ts @@ -34,10 +34,16 @@ import type { } from '@braidhq/schema' import { getAuthToken } from './authToken.js' import { getCurrentUserId } from './currentUser.js' -import { getServerUrl } from './serverUrl.js' +import { getTokenFor } from './remotes.js' +import { getServerUrl, getServerUrlFor } from './serverUrl.js' export function workspaceEventsUrl(workspaceId: string): string { - return `${getServerUrl()}/workspaces/${workspaceId}/events` + // EventSource cannot send custom headers, so the Bearer token is + // appended as `?token=...` and matched server-side by the auth + // middleware for SSE paths only. + const base = `${getServerUrl()}/workspaces/${workspaceId}/events` + const token = getAuthToken() + return token ? `${base}?token=${encodeURIComponent(token)}` : base } export interface ItemList { items: T[] } @@ -126,9 +132,8 @@ export class ApiError extends Error { } } -async function fetchJson(path: string, init?: RequestInit): Promise { - const token = getAuthToken() - const response = await fetch(`${getServerUrl()}${path}`, { +async function rawFetch(baseUrl: string, token: string | null, path: string, init?: RequestInit): Promise { + const response = await fetch(`${baseUrl}${path}`, { ...init, headers: { 'Content-Type': 'application/json', @@ -158,6 +163,19 @@ async function fetchJson(path: string, init?: RequestInit): Promise { return response.json() as Promise } +async function fetchJson(path: string, init?: RequestInit): Promise { + return rawFetch(getServerUrl(), getAuthToken(), path, init) +} + +/** + * Like `fetchJson` but targets an explicit remote regardless of the active + * one. The sidebar uses this to enumerate workspaces across every + * configured server without disturbing the active singleton. + */ +async function fetchJsonAt(remoteId: string, path: string, init?: RequestInit): Promise { + return rawFetch(getServerUrlFor(remoteId), getTokenFor(remoteId), path, init) +} + export interface AuthConfig { googleEnabled: boolean studioUrl: string @@ -224,10 +242,10 @@ export const api = { }), listWorkspaces: () => fetchJson>('/workspaces'), + listWorkspacesAt: (remoteId: string) => + fetchJsonAt>(remoteId, '/workspaces'), getWorkspace: (workspaceId: string) => fetchJson(`/workspaces/${workspaceId}`), - registerWorkspace: (rootPath: string) => - fetchJson('/workspaces', { method: 'POST', body: JSON.stringify({ rootPath }) }), scaffoldWorkspace: (name: string, manifest: ProductManifestDraft) => fetchJson('/workspaces/scaffold', { method: 'POST', @@ -382,8 +400,11 @@ export const api = { listRuns: (workspaceId: string) => fetchJson>(`/workspaces/${workspaceId}/runs`), - runEventsUrl: (workspaceId: string, runId: string) => - `${getServerUrl()}/workspaces/${workspaceId}/runs/${runId}/events`, + runEventsUrl: (workspaceId: string, runId: string) => { + const base = `${getServerUrl()}/workspaces/${workspaceId}/runs/${runId}/events` + const token = getAuthToken() + return token ? `${base}?token=${encodeURIComponent(token)}` : base + }, cancelRun: (workspaceId: string, runId: string) => fetchJson(`/workspaces/${workspaceId}/runs/${runId}/cancel`, { method: 'POST' }), forgetSession: (workspaceId: string, sessionId: string) => diff --git a/packages/studio/src/lib/useRemoteWorkspaces.ts b/packages/studio/src/lib/useRemoteWorkspaces.ts new file mode 100644 index 0000000..f3d3f33 --- /dev/null +++ b/packages/studio/src/lib/useRemoteWorkspaces.ts @@ -0,0 +1,100 @@ +import type { Workspace } from '@braidhq/schema' +import { useQueries, useQueryClient } from '@tanstack/react-query' +import { useEffect, useRef } from 'react' +import { api, ApiError } from './api' +import { getTokenFor, LOCAL_REMOTE_ID, useActiveRemoteId, useRemotes } from './remotes' +import { getServerUrlFor } from './serverUrl' + +export interface RemoteSummary { + id: string + name: string + url: string + isLocal: boolean +} + +export type RemoteWorkspacesState = + | { kind: 'loading' } + | { kind: 'ok', workspaces: Workspace[] } + | { kind: 'unauthenticated' } + | { kind: 'error', message: string } + +export interface RemoteWorkspacesResult { + remote: RemoteSummary + state: RemoteWorkspacesState +} + +export interface ClassifyInput { + hasToken: boolean + isPending: boolean + error: unknown + data: { items: Workspace[] } | undefined +} + +/** + * Pure classifier extracted so the state-machine for "no token / loading + * / 401 / network / ok" can be tested without standing up react-query. + * A 401 collapses to `unauthenticated` because the user's recourse is + * the same as having no token at all: sign in. + */ +export function classifyRemoteResult(remote: RemoteSummary, input: ClassifyInput): RemoteWorkspacesResult { + if (!remote.isLocal && !input.hasToken) + return { remote, state: { kind: 'unauthenticated' } } + if (input.isPending) + return { remote, state: { kind: 'loading' } } + if (input.error) { + if (input.error instanceof ApiError && input.error.status === 401) + return { remote, state: { kind: 'unauthenticated' } } + const message = input.error instanceof Error ? input.error.message : 'Unreachable' + return { remote, state: { kind: 'error', message } } + } + return { remote, state: { kind: 'ok', workspaces: input.data?.items ?? [] } } +} + +/** + * Fetch `/workspaces` from every configured remote in parallel. Local is + * always queried (X-Braid-User fallback covers the no-token sidecar + * case); remotes without a stored token short-circuit to + * `unauthenticated` so the sidebar can render a Sign in affordance + * without ever issuing a doomed request. + */ +export function useAllRemoteWorkspaces(): RemoteWorkspacesResult[] { + const remotes = useRemotes() + const all: RemoteSummary[] = [ + { id: LOCAL_REMOTE_ID, name: 'Local', url: getServerUrlFor(LOCAL_REMOTE_ID), isLocal: true }, + ...remotes.map(r => ({ id: r.id, name: r.name, url: r.url, isLocal: false })), + ] + const queries = useQueries({ + queries: all.map(remote => ({ + queryKey: ['workspaces-at', remote.id] as const, + queryFn: () => api.listWorkspacesAt(remote.id), + retry: false, + enabled: remote.isLocal || getTokenFor(remote.id) != null, + })), + }) + return all.map((remote, i) => { + const query = queries[i]! + return classifyRemoteResult(remote, { + hasToken: getTokenFor(remote.id) != null, + isPending: query.isPending, + error: query.error, + data: query.data, + }) + }) +} + +/** + * Clears the react-query cache whenever the active remote flips. Without + * this every workspace-scoped query (proposals, clarify, history…) would + * return stale data from the previous server until its own TTL expired. + */ +export function useResetOnRemoteChange(): void { + const queryClient = useQueryClient() + const activeId = useActiveRemoteId() + const prevRef = useRef(activeId) + useEffect(() => { + if (prevRef.current !== activeId) { + queryClient.clear() + prevRef.current = activeId + } + }, [activeId, queryClient]) +} diff --git a/packages/studio/src/lib/useWorkspaceEvents.ts b/packages/studio/src/lib/useWorkspaceEvents.ts index 93a6e9d..e86defe 100644 --- a/packages/studio/src/lib/useWorkspaceEvents.ts +++ b/packages/studio/src/lib/useWorkspaceEvents.ts @@ -30,6 +30,9 @@ interface WorkspaceEvent { | 'batch.completed' | 'batch.stopped' | 'batch.failed' + | 'batch.checkpoint.started' + | 'batch.checkpoint.completed' + | 'batch.checkpoint.failed' } /** @@ -122,6 +125,14 @@ export function useWorkspaceEvents(workspaceId: string | null): void { source.addEventListener('batch.completed', invalidateBatch) source.addEventListener('batch.stopped', invalidateBatch) source.addEventListener('batch.failed', invalidateBatch) + source.addEventListener('batch.checkpoint.started', invalidateBatch) + source.addEventListener('batch.checkpoint.completed', () => { + // Model run cross-links nodes & runs validators — graph can shift. + invalidateBatch() + invalidateProposals() + invalidateGraph() + }) + source.addEventListener('batch.checkpoint.failed', invalidateBatch) return () => { source.close() diff --git a/packages/studio/src/pages/Batch.tsx b/packages/studio/src/pages/Batch.tsx index 68cab72..146e72f 100644 --- a/packages/studio/src/pages/Batch.tsx +++ b/packages/studio/src/pages/Batch.tsx @@ -19,7 +19,7 @@ const EXTRACT_SKILL_ID = 'braid-extract' const STATUS_TONE: Record = { idle: 'border-zinc-400/40 bg-zinc-400/10 text-zinc-700 dark:text-zinc-300', - scanning: 'border-violet-500/40 bg-violet-500/10 text-violet-700 dark:text-violet-300', + deriving: 'border-violet-500/40 bg-violet-500/10 text-violet-700 dark:text-violet-300', running: 'border-sky-500/40 bg-sky-500/10 text-sky-700 dark:text-sky-300', completed: 'border-emerald-500/40 bg-emerald-500/10 text-emerald-700 dark:text-emerald-300', failed: 'border-rose-500/40 bg-rose-500/10 text-rose-700 dark:text-rose-300', @@ -193,7 +193,7 @@ function ActiveBatch({ workspaceId, plan }: { workspaceId: string, plan: BatchPl {terminal && }
    u.status === 'completed').length const hasUnfinished = plan.units.some(u => u.status === 'failed' || u.status === 'pending') + const latestCheckpointPhase = plan.checkpointPhases[plan.checkpointPhases.length - 1] + const checkpointRunning = latestCheckpointPhase?.status === 'running' + const headerLabel = plan.status === 'deriving' + ? 'Deriving Units…' + : terminal + ? 'Bootstrap Report' + : checkpointRunning + ? 'Running Checkpoint…' + : 'Processing Units…' return (
    @@ -229,13 +238,7 @@ function BatchHeader({ workspaceId, plan, terminal }: { {terminal ? : } -

    - {plan.status === 'scanning' - ? 'Scanning Codebase…' - : terminal - ? 'Bootstrap Report' - : 'Running Bootstrap…'} -

    +

    {headerLabel}

    {plan.status} @@ -243,8 +246,16 @@ function BatchHeader({ workspaceId, plan, terminal }: { {completed} {' / '} {plan.units.length} - {' units'} + {' units processed'} + {plan.checkpointPhases.length > 0 && ( + + {plan.checkpointPhases.filter(p => p.status === 'completed').length} + {' / '} + {plan.checkpointPhases.length} + {' checkpoints'} + + )}
    {terminal @@ -305,29 +316,239 @@ function ReportBar({ plan }: { plan: BatchPlan }) { ) } -function UnitList({ units, selectedRunId, activeRunId, onSelect }: { - units: readonly PlanUnit[] +function UnitList({ plan, selectedRunId, activeRunId, onSelect }: { + plan: BatchPlan selectedRunId: SkillRunId | null activeRunId: SkillRunId | null onSelect: (runId: SkillRunId | null) => void }) { + const chunks = useMemo(() => groupUnitsByCheckpoint(plan), [plan]) + const isTerminal = plan.status === 'completed' || plan.status === 'failed' || plan.status === 'stopped' || plan.status === 'archived' + + const perUnitLabel = plan.batchPolicy?.perUnitLabel ?? plan.batchPolicy?.perUnitSkillId + const checkpointLabel = plan.batchPolicy?.checkpointLabel ?? plan.batchPolicy?.checkpointSkillId return ( -
      - {units.map(unit => ( - onSelect(unit.skillRunId ?? null)} +
      + {chunks.map((chunk, idx) => ( + ))} -
    +
    + ) +} + +interface UnitChunk { + units: readonly PlanUnit[] + phase: BatchPlan['checkpointPhases'][number] | undefined + isFinal: boolean +} + +/** + * Build the visual chunks. For each committed checkpoint phase we + * group the units it consumed. Remaining units are split into + * anticipated chunks using `batchPolicy.checkpointChunkSize` so the + * reviewer sees the structure before any checkpoint fires. + * + * When `checkpointRunAtEnd` is true we also surface the mandatory + * final pass even if units happen to divide evenly — the orchestrator + * will fire one more checkpoint with an empty unit list in that case. + */ +function groupUnitsByCheckpoint(plan: BatchPlan): UnitChunk[] { + const unitsById = new Map(plan.units.map(u => [u.id, u])) + const consumed = new Set() + const chunks: UnitChunk[] = [] + for (const phase of plan.checkpointPhases) { + const units: PlanUnit[] = [] + for (const id of phase.unitIds) { + const unit = unitsById.get(id) + if (unit) { + units.push(unit) + consumed.add(id) + } + } + chunks.push({ units, phase, isFinal: false }) + } + const leftovers = plan.units.filter(u => !consumed.has(u.id)) + const chunkSize = plan.batchPolicy?.checkpointChunkSize + const runAtEnd = plan.batchPolicy?.checkpointRunAtEnd ?? false + if (chunkSize && chunkSize > 0) { + for (let i = 0; i < leftovers.length; i += chunkSize) { + const slice = leftovers.slice(i, i + chunkSize) + const isLastSlice = i + chunkSize >= leftovers.length + chunks.push({ units: slice, phase: undefined, isFinal: isLastSlice && runAtEnd }) + } + if (runAtEnd && leftovers.length > 0 && leftovers.length % chunkSize === 0) + chunks.push({ units: [], phase: undefined, isFinal: true }) + } + else if (leftovers.length > 0) { + chunks.push({ units: leftovers, phase: undefined, isFinal: runAtEnd }) + } + return chunks +} + +type PhaseTone = 'completed' | 'failed' | 'running' | 'idle' + +function aggregateUnitsTone(units: readonly PlanUnit[]): PhaseTone { + if (units.length === 0) + return 'idle' + if (units.some(u => u.status === 'failed')) + return 'failed' + if (units.some(u => u.status === 'running')) + return 'running' + return units.every(u => u.status === 'completed') ? 'completed' : 'idle' +} + +function ChunkSection({ chunkIndex, chunk, isTerminal, perUnitLabel, checkpointLabel, selectedRunId, activeRunId, onSelect }: { + chunkIndex: number + chunk: UnitChunk + isTerminal: boolean + perUnitLabel: string | undefined + checkpointLabel: string | undefined + selectedRunId: SkillRunId | null + activeRunId: SkillRunId | null + onSelect: (runId: SkillRunId | null) => void +}) { + const completedUnits = chunk.units.filter(u => u.status === 'completed').length + const groupTone = aggregateUnitsTone(chunk.units) + return ( + <> + +
      + {chunk.units.map(unit => ( + onSelect(unit.skillRunId ?? null)} + /> + ))} +
    + { + const base = checkpointLabel ? `${checkpointLabel} • Checkpoint ${chunkIndex + 1}` : `Checkpoint ${chunkIndex + 1}` + return chunk.isFinal && !chunk.phase ? `${base} (final)` : base + })()} + badge={chunk.phase?.status === 'completed' ? '1 / 1' : '0 / 1'} + tone={chunk.phase + ? (chunk.phase.status === 'running' ? 'running' : chunk.phase.status === 'completed' ? 'completed' : 'failed') + : 'idle'} + /> +
      + {chunk.phase + ? ( + onSelect(chunk.phase?.skillRunId ?? null)} + /> + ) + : ( +
    • + {isTerminal + ? 'No checkpoint recorded for these units.' + : chunk.units.length === 0 + ? 'Final validation pass; runs after the last unit checkpoint.' + : 'Will run once these units finish.'} +
    • + )} +
    + + ) +} + +function PhaseSectionHeader({ title, badge, tone = 'idle' }: { + title: string + badge?: string + tone?: 'idle' | 'running' | 'completed' | 'failed' +}) { + return ( +
    + {title} + {badge && ( + + {badge} + + )} +
    ) } -function UnitRow({ unit, active, selected, onSelect }: { +const CHECKPOINT_PHASE_ICON: Record = { + running: , + completed: , + failed: , +} + +function CheckpointPhaseRow({ phase, label, active, selected, onSelect }: { + phase: BatchPlan['checkpointPhases'][number] + label: string | undefined + active: boolean + selected: boolean + onSelect: () => void +}) { + const inspectable = !!phase.skillRunId + return ( +
  • + +
  • + ) +} + +function UnitRow({ unit, actionLabel, active, selected, onSelect }: { unit: PlanUnit + actionLabel: string | undefined active: boolean selected: boolean onSelect: () => void @@ -350,11 +571,13 @@ function UnitRow({ unit, active, selected, onSelect }: { >
    {UNIT_ICON[unit.status]}
    -
    +
    + {actionLabel && ( + + {actionLabel} + + )} {unit.name} - - {unit.status} -
    {(unit.proposalIds.length > 0 || unit.clarifyTicketIds.length > 0) && (
    diff --git a/packages/studio/src/pages/History.tsx b/packages/studio/src/pages/History.tsx index 771df96..c968d2b 100644 --- a/packages/studio/src/pages/History.tsx +++ b/packages/studio/src/pages/History.tsx @@ -33,6 +33,7 @@ const KIND_LABEL: Record = { 'restore': 'Restore', 'snapshot': 'Snapshot', 'initial': 'Initial', + 'batch-archive': 'Archive', } const KIND_TONE: Record = { @@ -48,6 +49,7 @@ const KIND_TONE: Record = { 'restore': 'border-orange-500/40 bg-orange-500/10 text-orange-700 dark:text-orange-300', 'snapshot': 'border-fuchsia-500/40 bg-fuchsia-500/10 text-fuchsia-700 dark:text-fuchsia-300', 'initial': 'border-zinc-400/40 bg-zinc-400/10 text-zinc-600 dark:text-zinc-400', + 'batch-archive': 'border-zinc-500/40 bg-zinc-500/10 text-zinc-700 dark:text-zinc-300', } export function HistoryPage({ workspaceId }: HistoryPageProps) { diff --git a/packages/studio/test/lib/useRemoteWorkspaces.test.ts b/packages/studio/test/lib/useRemoteWorkspaces.test.ts new file mode 100644 index 0000000..09ea1fc --- /dev/null +++ b/packages/studio/test/lib/useRemoteWorkspaces.test.ts @@ -0,0 +1,83 @@ +import type { Workspace } from '@braidhq/schema' +import { describe, expect, it } from 'vitest' +import { ApiError } from '../../src/lib/api' +import { classifyRemoteResult, type RemoteSummary } from '../../src/lib/useRemoteWorkspaces' + +const LOCAL: RemoteSummary = { id: 'local', name: 'Local', url: 'http://localhost:4321', isLocal: true } +const REMOTE: RemoteSummary = { id: 'r-team', name: 'team', url: 'https://team.example.com', isLocal: false } + +function ws(id: string): Workspace { + return { id, rootPath: '/tmp', members: [] } as unknown as Workspace +} + +describe('classifyRemoteResult', () => { + it('skips remotes that have no stored token before considering the query state', () => { + const result = classifyRemoteResult(REMOTE, { + hasToken: false, + isPending: false, + error: undefined, + data: undefined, + }) + expect(result.state.kind).toBe('unauthenticated') + }) + + it('does not skip Local when there is no token, since X-Braid-User covers local trust mode', () => { + const result = classifyRemoteResult(LOCAL, { + hasToken: false, + isPending: false, + error: undefined, + data: { items: [ws('braid')] }, + }) + expect(result.state).toEqual({ kind: 'ok', workspaces: [ws('braid')] }) + }) + + it('reports loading before the first response settles', () => { + const result = classifyRemoteResult(LOCAL, { + hasToken: true, + isPending: true, + error: undefined, + data: undefined, + }) + expect(result.state.kind).toBe('loading') + }) + + it('collapses 401 to unauthenticated so the sidebar prompts a re-sign-in', () => { + const result = classifyRemoteResult(REMOTE, { + hasToken: true, + isPending: false, + error: new ApiError('Unauthorized', 401), + data: undefined, + }) + expect(result.state.kind).toBe('unauthenticated') + }) + + it('surfaces non-401 errors with their message so the user can see what is wrong', () => { + const result = classifyRemoteResult(REMOTE, { + hasToken: true, + isPending: false, + error: new Error('Network unreachable'), + data: undefined, + }) + expect(result.state).toEqual({ kind: 'error', message: 'Network unreachable' }) + }) + + it('returns the workspace list on success', () => { + const result = classifyRemoteResult(REMOTE, { + hasToken: true, + isPending: false, + error: undefined, + data: { items: [ws('alpha'), ws('beta')] }, + }) + expect(result.state).toEqual({ kind: 'ok', workspaces: [ws('alpha'), ws('beta')] }) + }) + + it('treats missing data as an empty workspace list rather than crashing', () => { + const result = classifyRemoteResult(REMOTE, { + hasToken: true, + isPending: false, + error: undefined, + data: undefined, + }) + expect(result.state).toEqual({ kind: 'ok', workspaces: [] }) + }) +}) diff --git a/packages/test-utils/src/ontology.ts b/packages/test-utils/src/ontology.ts index 6709174..b44c8ff 100644 --- a/packages/test-utils/src/ontology.ts +++ b/packages/test-utils/src/ontology.ts @@ -1,6 +1,7 @@ import type { EdgeTypeDescriptor, NodeTypeDescriptor, + OntologyBatchBinding, OntologyPlugin, OntologyValidator, } from '@braidhq/core' @@ -13,6 +14,7 @@ export interface MakeOntologyOptions { readonly nodeTypes?: readonly NodeTypeDescriptor[] readonly edgeTypes?: readonly EdgeTypeDescriptor[] readonly validators?: readonly OntologyValidator[] + readonly batch?: OntologyBatchBinding } /** @@ -34,5 +36,6 @@ export function makeOntology(opts: MakeOntologyOptions = {}): OntologyPlugin { nodeTypes: opts.nodeTypes ?? [], edgeTypes: opts.edgeTypes ?? [], validators: opts.validators ?? [], + ...(opts.batch ? { batch: opts.batch } : {}), } }