Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
331 changes: 283 additions & 48 deletions packages/core/src/application/BatchService.ts

Large diffs are not rendered by default.

26 changes: 4 additions & 22 deletions packages/core/src/application/HITLService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import type {
GraphOperation,
ProposalDraft,
ProposalId,
UserId,
ValidationIssue,
WorkspaceId,
} from '@braidhq/schema'
Expand All @@ -26,15 +25,17 @@ import type { Workspace } from '../domain/workspace/Workspace.js'
import type { ValidationService } from './ValidationService.js'
import type { WorkspaceEventBus } from './WorkspaceEventBus.js'
import type { WorkspaceService } from './WorkspaceService.js'
import { UserId } from '@braidhq/schema'
import { ValidationError } from '../domain/errors.js'
import { ClarifyTicket } from '../domain/hitl/ClarifyTicket.js'
import { Proposal } from '../domain/hitl/Proposal.js'
import { newClarifyCandidateId, newClarifyTicketId, newDecisionId, newProposalId } from '../domain/ids.js'
import { noopUserDirectory } from '../domain/users/UserDirectory.js'
import { enrichCommitAuthor } from './enrichCommitAuthor.js'
import { PerWorkspaceLock } from './PerWorkspaceLock.js'

// Generic system author for submit commits until Theme 13 (account management) supplies real per-user attribution.
const SUBMIT_USER_ID = 'braid-skill' as UserId
const SUBMIT_USER_ID = UserId.parse('braid-skill')

export interface HITLServiceDeps {
proposalRepository: ProposalRepository
Expand Down Expand Up @@ -365,7 +366,7 @@ export class HITLService {
const snapshot = await this.deps.modelRepository.load(workspace.id)
await this.deps.graphSerializer.write(workspace, snapshot)
}
const enriched = await this.enrichAuthor(message)
const enriched = await enrichCommitAuthor(message, this.userDirectory)
const sha = await this.deps.history.commit(workspace, enriched)
this.deps.eventBus?.publish({
type: 'history.committed',
Expand All @@ -374,23 +375,4 @@ export class HITLService {
at: this.deps.clock.now(),
})
}

/**
* Snapshot the user's displayName + email into the commit so git
* stores the real Google identity. Skips when `userDirectory`
* doesn't know the userId — bootstrap / skill-system authors lack
* a row and fall through to the placeholder synth in the git layer.
*/
private async enrichAuthor(message: CommitMessage): Promise<CommitMessage> {
if (message.authorName !== undefined || message.authorEmail !== undefined)
return message
const author = await this.userDirectory.resolve(message.userId)
if (!author)
return message
return {
...message,
authorName: author.displayName,
...(author.email ? { authorEmail: author.email } : {}),
}
}
}
21 changes: 21 additions & 0 deletions packages/core/src/application/HistoryService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import type { WorkspaceService } from './WorkspaceService.js'
import { diffSnapshots } from '@braidhq/schema'
import { ConflictError } from '../domain/errors.js'
import { noopUserDirectory } from '../domain/users/UserDirectory.js'
import { enrichCommitAuthor } from './enrichCommitAuthor.js'

export interface HistoryServiceDeps {
history: WorkspaceHistory
Expand Down Expand Up @@ -111,6 +112,26 @@ export class HistoryService {
return this.deps.history.tag(workspace, sha, name, note)
}

/**
* Commit whatever artifact changes a caller has just persisted to
* disk, with the supplied commit message. The caller is responsible
* for holding the per-workspace lock (so the commit isn't racing a
* concurrent restore) and for having already written its file
* changes. `userId` is snapshotted into the git author line.
*/
async commitWorkspaceChange(workspaceId: WorkspaceId, message: CommitMessage): Promise<CommitSha> {
const workspace = await this.deps.workspaceService.findById(workspaceId)
const enriched = await enrichCommitAuthor(message, this.userDirectory)
const sha = await this.deps.history.commit(workspace, enriched)
this.deps.eventBus?.publish({
type: 'history.committed',
workspaceId: workspace.id,
sha,
at: this.deps.clock.now(),
})
return sha
}

async listTags(workspaceId: WorkspaceId): Promise<readonly TagMeta[]> {
const workspace = await this.deps.workspaceService.findById(workspaceId)
return this.deps.history.listTags(workspace)
Expand Down
67 changes: 67 additions & 0 deletions packages/core/src/application/SourceUnitStateService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import type { SkillRunId, SourceId, SourceUnitDiff, SourceUnitState, WorkspaceId } from '@braidhq/schema'
import type { Clock } from '../domain/Clock.js'
import type { SourceUnitDigest } from '../domain/source/SourceUnitDigest.js'
import type { SourceUnitStateRepository } from '../domain/source/SourceUnitStateRepository.js'
import type { WorkspaceService } from './WorkspaceService.js'
import { computeSourceUnitDiff } from '../domain/source/computeSourceUnitDiff.js'

export interface SourceUnitStateServiceDeps {
repository: SourceUnitStateRepository
digest: SourceUnitDigest
workspaceService: WorkspaceService
clock: Clock
}

/**
* Application-level entry point for recording and querying source unit
* observations. Every orchestrator that runs a skill against a source
* unit (BatchService today, ReactorService and manual-extract dispatch
* later) goes through `recordObservation` so the audit and diff
* primitives stay in one place.
*
* Reads (`listByWorkspace`, `diffAgainst`) are delegated to the
* repository / pure diff function. The service exists mostly to (a)
* fan-in writes with sha computation, and (b) give callers a stable
* surface even if persistence later moves to a database.
*/
export class SourceUnitStateService {
constructor(private readonly deps: SourceUnitStateServiceDeps) {}

async recordObservation(
workspaceId: WorkspaceId,
sourceId: SourceId,
path: string,
runId?: SkillRunId,
): Promise<SourceUnitState> {
const workspace = await this.deps.workspaceService.findById(workspaceId)
const sha = await this.deps.digest.computeSha(workspace, sourceId, path)
const state: SourceUnitState = {
workspaceId,
sourceId,
path,
lastObservedSha: sha,
lastObservedAt: this.deps.clock.now(),
...(runId ? { lastObservedByRunId: runId } : {}),
}
await this.deps.repository.save(state)
return state
}

async listByWorkspace(workspaceId: WorkspaceId): Promise<readonly SourceUnitState[]> {
return this.deps.repository.listByWorkspace(workspaceId)
}

async listBySource(workspaceId: WorkspaceId, sourceId: SourceId): Promise<readonly SourceUnitState[]> {
return this.deps.repository.listBySource(workspaceId, sourceId)
}

/**
* Compute the partition of `units` against what's currently recorded
* for this workspace. Used by Reactor to decide which units to
* re-extract; useful in tests for asserting batch progress.
*/
async diffAgainst(workspaceId: WorkspaceId, units: ReadonlyArray<{ sourceId: SourceId, path: string, sha: SourceUnitState['lastObservedSha'] }>): Promise<SourceUnitDiff> {
const states = await this.deps.repository.listByWorkspace(workspaceId)
return computeSourceUnitDiff(states, units)
}
}
24 changes: 24 additions & 0 deletions packages/core/src/application/enrichCommitAuthor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import type { CommitMessage } from '@braidhq/schema'
import type { UserDirectory } from '../domain/users/UserDirectory.js'

/**
* Resolve the human author for a commit and snapshot `displayName` /
* `email` into the message so a later rename of the user record can't
* retroactively rewrite git history. Pass-through when the caller has
* already populated those fields, or when the directory has no record.
*/
export async function enrichCommitAuthor(
message: CommitMessage,
userDirectory: UserDirectory,
): Promise<CommitMessage> {
if (message.authorName !== undefined || message.authorEmail !== undefined)
return message
const author = await userDirectory.resolve(message.userId)
if (!author)
return message
return {
...message,
authorName: author.displayName,
...(author.email ? { authorEmail: author.email } : {}),
}
}
62 changes: 53 additions & 9 deletions packages/core/src/domain/batch/BatchPlan.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type {
BatchCheckpointPhase,
BatchInputMode,
BatchPlan as BatchPlanData,
BatchPlanId,
Expand All @@ -8,6 +9,7 @@ import type {
PlanUnit,
PlanUnitId,
ProposalId,
SkillRunId,
Timestamp,
WorkspaceId,
} from '@braidhq/schema'
Expand All @@ -30,24 +32,25 @@ export class BatchPlan {
get units(): readonly PlanUnit[] { return this.data.units }
get running(): BatchRunning | undefined { return this.data.running }
get error(): string | undefined { return this.data.error }
get checkpointPhases(): readonly BatchCheckpointPhase[] { return this.data.checkpointPhases }
get createdAt(): Timestamp { return this.data.createdAt }
get updatedAt(): Timestamp { return this.data.updatedAt }

// mode='scan' goes idle → scanning → running. mode='intent' goes idle → running.
// mode='derive' goes idle → deriving → running. mode='intent' goes idle → running.
beginRun(now: Timestamp, baselineTag: string): BatchPlan {
if (this.data.status !== 'idle')
throw new ConflictError(`Batch plan ${this.data.id} is already ${this.data.status}`)
return this.with({
status: this.data.mode === 'scan' ? 'scanning' : 'running',
status: this.data.mode === 'derive' ? 'deriving' : 'running',
baselineTag,
updatedAt: now,
})
}

// Called when braid-scan finished writing units back into the plan.
// Called when the ontology's `deriveUnits` skill finishes writing units back into the plan.
promoteToRunning(now: Timestamp, units: readonly PlanUnit[]): BatchPlan {
if (this.data.status !== 'scanning')
throw new ConflictError(`Batch plan ${this.data.id} is not scanning (status=${this.data.status})`)
if (this.data.status !== 'deriving')
throw new ConflictError(`Batch plan ${this.data.id} is not deriving (status=${this.data.status})`)
return this.with({ status: 'running', units: [...units], updatedAt: now })
}

Expand Down Expand Up @@ -104,6 +107,41 @@ export class BatchPlan {
return this.with({ status: 'stopped', running: undefined, updatedAt: now })
}

startCheckpointPhase(now: Timestamp, skillRunId: SkillRunId, unitIds: readonly PlanUnitId[]): BatchPlan {
const newPhase: BatchCheckpointPhase = {
status: 'running',
unitIds: [...unitIds],
startedAt: now,
skillRunId,
}
return this.with({
checkpointPhases: [...this.data.checkpointPhases, newPhase],
updatedAt: now,
})
}

completeCheckpointPhase(now: Timestamp): BatchPlan {
return this.with({
checkpointPhases: this.mapLastCheckpointPhase(phase => ({ ...phase, status: 'completed', completedAt: now })),
updatedAt: now,
})
}

failCheckpointPhase(now: Timestamp, error: string): BatchPlan {
return this.with({
checkpointPhases: this.mapLastCheckpointPhase(phase => ({ ...phase, status: 'failed', completedAt: now, error })),
updatedAt: now,
})
}

private mapLastCheckpointPhase(fn: (phase: BatchCheckpointPhase) => BatchCheckpointPhase): BatchCheckpointPhase[] {
const arr = [...this.data.checkpointPhases]
if (arr.length === 0)
return arr
arr[arr.length - 1] = fn(arr[arr.length - 1]!)
return arr
}

// User-driven dismiss after reviewing the report. Allowed only from a
// terminal state so an in-flight batch can't be hidden by accident.
// Archived plans stay on disk; the UI treats them like "no active plan"
Expand All @@ -119,14 +157,20 @@ export class BatchPlan {
if (!this.isTerminal())
throw new ConflictError(`Cannot resume plan ${this.data.id} from status=${this.data.status}`)
const units = this.data.units.map(unit => unit.status === 'failed' || unit.status === 'pending' ? resetUnit(unit) : unit)
const patch: Partial<BatchPlanData> = {
// Drop failed checkpoint phases so the upcoming chunk accounting
// resets; successful phases stay because the units they consumed
// are still recorded as completed.
const checkpointPhases = this.data.checkpointPhases.filter(p => p.status === 'completed')
const next: BatchPlanData = {
...this.data,
status: 'running',
running: undefined,
units,
checkpointPhases,
updatedAt: now,
}
delete patch.error
return this.with(patch)
delete (next as { running?: BatchRunning }).running
delete (next as { error?: string }).error
return new BatchPlan(next)
}

toData(): BatchPlanData {
Expand Down
28 changes: 28 additions & 0 deletions packages/core/src/domain/events/WorkspaceEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ export type WorkspaceEvent =
| BatchCompletedEvent
| BatchStoppedEvent
| BatchFailedEvent
| BatchCheckpointStartedEvent
| BatchCheckpointCompletedEvent
| BatchCheckpointFailedEvent

export interface RunStartedEvent {
readonly type: 'run.started'
Expand Down Expand Up @@ -190,4 +193,29 @@ export interface BatchFailedEvent {
readonly at: string
}

export interface BatchCheckpointStartedEvent {
readonly type: 'batch.checkpoint.started'
readonly workspaceId: WorkspaceId
readonly planId: BatchPlanId
readonly skillRunId: SkillRunId
readonly at: string
}

export interface BatchCheckpointCompletedEvent {
readonly type: 'batch.checkpoint.completed'
readonly workspaceId: WorkspaceId
readonly planId: BatchPlanId
readonly skillRunId: SkillRunId
readonly at: string
}

export interface BatchCheckpointFailedEvent {
readonly type: 'batch.checkpoint.failed'
readonly workspaceId: WorkspaceId
readonly planId: BatchPlanId
readonly skillRunId: SkillRunId
readonly error: string
readonly at: string
}

export type WorkspaceEventType = WorkspaceEvent['type']
Loading
Loading