diff --git a/.changeset/agents-signal-stop-controls.md b/.changeset/agents-signal-stop-controls.md new file mode 100644 index 0000000000..c3a139cea2 --- /dev/null +++ b/.changeset/agents-signal-stop-controls.md @@ -0,0 +1,9 @@ +--- +'@electric-ax/agents-runtime': patch +'@electric-ax/agents-server': patch +'@electric-ax/agents-server-ui': patch +'@electric-ax/agents-server-conformance-tests': patch +'electric-ax': patch +--- + +Add durable entity signals and signal-driven stop controls for agents. The server, runtime, conformance tests, and CLI now use signal APIs, persist signal events, and let the UI send `SIGINT` to cancel active generations with pending stop feedback. diff --git a/packages/agents-runtime/src/agents-client.ts b/packages/agents-runtime/src/agents-client.ts index daea470202..b3969a4f84 100644 --- a/packages/agents-runtime/src/agents-client.ts +++ b/packages/agents-runtime/src/agents-client.ts @@ -3,6 +3,7 @@ import { createEntityStreamDB } from './entity-stream-db' import { normalizeObservationSchema } from './observation-schema' import { createRuntimeServerClient } from './runtime-server-client' import { appendPathToUrl } from './url' +import type { EntitySignal } from './runtime-server-client' import type { EntitiesObservationSource, EntityObservationSource, @@ -23,12 +24,26 @@ export interface AgentsClient { observe: ( source: ObservationSource ) => Promise + signal: (options: { + entityUrl: string + signal: EntitySignal + reason?: string + payload?: unknown + }) => Promise<{ txid: number }> + kill: (entityUrl: string, reason?: string) => Promise<{ txid: number }> } export function createAgentsClient(config: AgentsClientConfig): AgentsClient { const serverClient = createRuntimeServerClient(config) return { + signal: (options) => serverClient.signalEntity(options), + kill: (entityUrl, reason) => + serverClient.signalEntity({ + entityUrl, + signal: `SIGKILL`, + reason, + }), async observe(source) { if (source.sourceType === `entity`) { const info = await serverClient.getEntityInfo( diff --git a/packages/agents-runtime/src/context-factory.ts b/packages/agents-runtime/src/context-factory.ts index b5da05d619..50108ae5c8 100644 --- a/packages/agents-runtime/src/context-factory.ts +++ b/packages/agents-runtime/src/context-factory.ts @@ -21,6 +21,7 @@ import type { AgentModel, AgentRunResult, AgentTool, + EntitySignal, EntityHandle, EntityStreamDBWithActions, HandlerContext, @@ -65,6 +66,14 @@ export interface HandlerContextConfig { writeEvent: (event: ChangeEvent) => void wakeSession: WakeSession wakeEvent: WakeEvent + runSignal?: AbortSignal + registerSignalHandler?: ( + handler: (signal: { + signal: EntitySignal + reason?: string + payload?: unknown + }) => void | Promise + ) => void doObserve: ( source: ObservationSource, wake?: Wake @@ -396,7 +405,7 @@ export function createHandlerContext( ) } - await handle.run(runInput) + await handle.run(runInput, config.runSignal) runtimeLog.info(logPrefix, `agent.run completed`) return { @@ -575,6 +584,9 @@ export function createHandlerContext( afterMs: opts?.afterMs, }) }, + onSignal(handler): void { + config.registerSignalHandler?.(handler) + }, recordRun(): RunHandle { const key = nextRunKey() let deltaCounter = 0 diff --git a/packages/agents-runtime/src/entity-schema.ts b/packages/agents-runtime/src/entity-schema.ts index 3f7d62cfb6..bde4357c53 100644 --- a/packages/agents-runtime/src/entity-schema.ts +++ b/packages/agents-runtime/src/entity-schema.ts @@ -45,7 +45,31 @@ type SequencedPersistedRow = Omit< _seq?: number } type Schema = z.ZodType -type ChildEntityStatusValue = `spawning` | `running` | `idle` | `stopped` +type ChildEntityStatusValue = + | `spawning` + | `running` + | `idle` + | `paused` + | `stopping` + | `stopped` + | `killed` +export type EntitySignal = + | `SIGINT` + | `SIGHUP` + | `SIGTERM` + | `SIGKILL` + | `SIGSTOP` + | `SIGCONT` + | `SIGUSR` +type SignalHandlingStatus = `unhandled` | `handled` +type SignalOutcome = + | `transitioned` + | `ignored` + | `invalid_for_state` + | `delivered` + | `aborted` + | `shutdown_requested` + | `failed` type TagEntryValue = { key?: string value: string @@ -162,6 +186,20 @@ type EntityStoppedValue = { timestamp: string reason?: string } +type SignalValue = { + key?: string + signal: EntitySignal + status: SignalHandlingStatus + sender?: string + reason?: string + payload?: unknown + timestamp: string + handled_at?: string + handled_by?: string + outcome?: SignalOutcome + previous_state?: ChildEntityStatusValue + new_state?: ChildEntityStatusValue +} type ChildStatusEntryValue = { key?: string entity_url: string @@ -270,7 +308,27 @@ function createJsonObjectSchema(): Schema> { } function createChildEntityStatusSchema(): Schema { - return z.enum([`spawning`, `running`, `idle`, `stopped`]) + return z.enum([ + `spawning`, + `running`, + `idle`, + `paused`, + `stopping`, + `stopped`, + `killed`, + ]) +} + +function createEntitySignalSchema(): Schema { + return z.enum([ + `SIGINT`, + `SIGHUP`, + `SIGTERM`, + `SIGKILL`, + `SIGSTOP`, + `SIGCONT`, + `SIGUSR`, + ]) } function createWakeChangeSchema(): Schema { @@ -437,6 +495,33 @@ function createEntityStoppedSchema(): Schema { }) } +function createSignalSchema(): Schema { + return z.object({ + key: z.string().optional(), + signal: createEntitySignalSchema(), + status: z.enum([`unhandled`, `handled`]), + sender: z.string().optional(), + reason: z.string().optional(), + payload: z.unknown().optional(), + timestamp: z.string(), + handled_at: z.string().optional(), + handled_by: z.string().optional(), + outcome: z + .enum([ + `transitioned`, + `ignored`, + `invalid_for_state`, + `delivered`, + `aborted`, + `shutdown_requested`, + `failed`, + ]) + .optional(), + previous_state: createChildEntityStatusSchema().optional(), + new_state: createChildEntityStatusSchema().optional(), + }) +} + function createChildStatusSchema(): Schema { return z.object({ key: z.string().optional(), @@ -591,6 +676,7 @@ export type MessageReceived = SequencedPersistedRow export type WakeEntry = SequencedPersistedRow export type EntityCreated = SequencedPersistedRow export type EntityStopped = SequencedPersistedRow +export type Signal = SequencedPersistedRow export type ChildStatusEntry = SequencedPersistedRow export type TagEntry = SequencedPersistedRow export type ContextInserted = SequencedPersistedRow @@ -660,6 +746,7 @@ export const ENTITY_COLLECTIONS = { wakes: `wakes`, entityCreated: `entityCreated`, entityStopped: `entityStopped`, + signals: `signals`, childStatus: `childStatus`, tags: `tags`, manifests: `manifests`, @@ -685,6 +772,7 @@ export const BUILT_IN_EVENT_SCHEMAS = { createEntityCreatedSchema() as unknown as BuiltInEntitySchema, entity_stopped: createEntityStoppedSchema() as unknown as BuiltInEntitySchema, + signal: createSignalSchema() as unknown as BuiltInEntitySchema, child_status: createChildStatusSchema() as unknown as BuiltInEntitySchema, tags: createTagEntrySchema() as unknown as BuiltInEntitySchema, @@ -714,6 +802,7 @@ type EntityCollectionsDefinition = { wakes: CollectionDefinition entityCreated: CollectionDefinition entityStopped: CollectionDefinition + signals: CollectionDefinition childStatus: CollectionDefinition tags: CollectionDefinition manifests: CollectionDefinition @@ -786,6 +875,11 @@ export const builtInCollections: EntityCollectionsDefinition = { type: `entity_stopped`, primaryKey: `key`, }, + signals: { + schema: BUILT_IN_EVENT_SCHEMAS.signal as StandardSchemaV1, + type: `signal`, + primaryKey: `key`, + }, childStatus: { schema: BUILT_IN_EVENT_SCHEMAS.child_status as StandardSchemaV1, @@ -836,6 +930,7 @@ export const entityStateSchema: StateSchema = /** Event types that are management/bookkeeping rather than agent content. */ const MANAGEMENT_TYPES = new Set([ `entity_created`, + `signal`, `manifest`, `replay_watermark`, `ack`, diff --git a/packages/agents-runtime/src/index.ts b/packages/agents-runtime/src/index.ts index 5668a1f3b2..f4dffc3fce 100644 --- a/packages/agents-runtime/src/index.ts +++ b/packages/agents-runtime/src/index.ts @@ -90,6 +90,8 @@ export type { WakeEntry, EntityCreated, EntityStopped, + EntitySignal, + Signal, ChildStatusEntry, TagEntry, ContextInserted as ContextInsertedEvent, diff --git a/packages/agents-runtime/src/pi-adapter.ts b/packages/agents-runtime/src/pi-adapter.ts index cec2d2132b..47d13d8439 100644 --- a/packages/agents-runtime/src/pi-adapter.ts +++ b/packages/agents-runtime/src/pi-adapter.ts @@ -51,9 +51,10 @@ interface PiAgentAdapterConfig { } interface PiAgentHandle { - run: (input?: string) => Promise + run: (input?: string, abortSignal?: AbortSignal) => Promise steer: (message: string) => void isRunning: () => boolean + abort: () => void dispose: () => void } @@ -177,6 +178,7 @@ export function createPiAgentAdapter( let disposed = false let stepStartTime = 0 let textStarted = false + let abortedRun = false const model = resolvePiModel({ model: opts.model, @@ -267,7 +269,12 @@ export function createPiAgentAdapter( | undefined const isError = - msg?.stopReason === `error` || !!msg?.errorMessage + msg?.stopReason === `error` || + (!!msg?.errorMessage && msg.stopReason !== `aborted`) + const isAborted = msg?.stopReason === `aborted` + if (isAborted) { + abortedRun = true + } if (isError) { runtimeLog.error( @@ -295,9 +302,11 @@ export function createPiAgentAdapter( ) const finishReason = isError ? `error` - : hasToolCalls - ? `tool_calls` - : `stop` + : isAborted + ? `aborted` + : hasToolCalls + ? `tool_calls` + : `stop` bridge.onStepEnd({ finishReason, durationMs: Date.now() - stepStartTime, @@ -335,7 +344,9 @@ export function createPiAgentAdapter( } case `agent_end`: { - bridge.onRunEnd({ finishReason: `stop` }) + bridge.onRunEnd({ + finishReason: abortedRun ? `aborted` : `stop`, + }) runtimeLog.debug( logPrefix, `pi-adapter agent_end textDeltas=${textDeltaCount} ` + @@ -367,30 +378,56 @@ export function createPiAgentAdapter( } return { - async run(input?: string): Promise { + async run(input?: string, abortSignal?: AbortSignal): Promise { running = true + abortedRun = false bridge.onRunStart() return new Promise((resolve, reject) => { - const unsubscribe = processAgentEvents( + let settled = false + let unsubscribe = (): void => {} + const finish = (finishReason: `stop` | `aborted` | `error`): void => { + if (settled) return + settled = true + running = false + abortSignal?.removeEventListener(`abort`, abortRun) + unsubscribe() + bridge.onRunEnd({ finishReason }) + } + const abortRun = (): void => { + if (settled) return + abortedRun = true + agent.abort() + finish(`aborted`) + resolve() + } + unsubscribe = processAgentEvents( () => { + if (settled) return + settled = true + running = false + abortSignal?.removeEventListener(`abort`, abortRun) unsubscribe() resolve() }, (err) => { - unsubscribe() + if (settled) return + finish(`error`) reject(err) } ) + abortSignal?.addEventListener(`abort`, abortRun, { once: true }) const runPromise = input !== undefined ? agent.prompt(input) : agent.continue() + if (abortSignal?.aborted) { + abortRun() + } Promise.resolve(runPromise).catch((err: Error) => { - running = false - bridge.onRunEnd({ finishReason: `error` }) - unsubscribe() + if (settled) return + finish(`error`) reject(err) }) }) @@ -408,8 +445,13 @@ export function createPiAgentAdapter( return running }, + abort(): void { + agent.abort() + }, + dispose(): void { disposed = true + agent.abort() running = false }, } diff --git a/packages/agents-runtime/src/process-wake.ts b/packages/agents-runtime/src/process-wake.ts index a8cac7eaaf..329615ebd5 100644 --- a/packages/agents-runtime/src/process-wake.ts +++ b/packages/agents-runtime/src/process-wake.ts @@ -31,6 +31,7 @@ import type { WakeSession, WebhookNotification, } from './types' +import type { Signal } from './entity-schema' import type { JsonBatch } from '@durable-streams/client' import type { ChangeEvent, StateEvent } from '@durable-streams/state' @@ -463,6 +464,17 @@ export async function processWake( // Live event handler — wired after preload, processes child_status + inbox let idleTimer: ReturnType | null = null let idleController: AbortController | null = null + let runAbortController: AbortController | null = null + let activeSignalHandler: + | (( + signal: Pick + ) => void | Promise) + | null = null + let pendingRunAbortRequested = false + let signalAbortRequested = false + let pauseRequested = false + let resumeRequested = false + const pendingSignalHandlers: Array> = [] const secondaryDbs: Array<{ drainPendingWrites?: () => Promise flushWrites?: () => Promise @@ -471,6 +483,7 @@ export async function processWake( }> = [] let liveProcessError: Error | null = null let acceptLiveInputs = false + const handledSignalKeys = new Set() const compareOffsets = (left: string, right: string): number => { if (left === right) return 0 @@ -550,6 +563,10 @@ export async function processWake( } function handleRuntimeSideEffectEvent(event: ChangeEvent): void { + if (event.type === `signal`) { + handleSignalEvent(event) + return + } if (event.type === `child_status` && result) { const spawnHandles = result.wakeSession.getSpawnHandles() const val = event.value as @@ -609,6 +626,180 @@ export async function processWake( return event.type === `inbox` || event.type === `wake` } + const shouldHandleSignalEvent = (event: ChangeEvent): boolean => { + if (event.type !== `signal`) return false + const value = event.value as Partial | undefined + if (value?.status === `unhandled`) return true + + // The server handles SIGKILL because it is terminal and closes streams, but + // an active runtime still needs to observe the signal and abort in-flight + // model/tool work. Do not try to rewrite the already-closed stream. + return value?.signal === `SIGKILL` && value.status === `handled` + } + + const markSignalHandled = ( + event: ChangeEvent, + outcome: + | `aborted` + | `ignored` + | `shutdown_requested` + | `transitioned` + | `delivered` + | `failed`, + newState?: string + ): void => { + const value = event.value as Partial | undefined + const key = String(event.key) + writeEvent( + entityStateSchema.signals.update({ + key, + value: { + ...value, + status: `handled`, + handled_at: new Date().toISOString(), + handled_by: entityUrl, + outcome, + previous_state: notification.entity?.status, + ...(newState ? { new_state: newState } : {}), + } as never, + }) as ChangeEvent + ) + } + + const invokeSignalHandler = ( + value: Partial, + onSettled: (outcome: `delivered` | `failed`) => void + ): void => { + if (!activeSignalHandler || !value.signal) { + onSettled(`delivered`) + return + } + + const task = Promise.resolve( + activeSignalHandler({ + signal: value.signal, + reason: value.reason, + payload: value.payload, + }) + ).then( + () => onSettled(`delivered`), + (err) => { + failBackgroundWake(err, `SIGNAL_HANDLER_FAILED`) + onSettled(`failed`) + } + ) + pendingSignalHandlers.push(task) + void task.finally(() => { + const index = pendingSignalHandlers.indexOf(task) + if (index >= 0) pendingSignalHandlers.splice(index, 1) + }) + } + + const handleSignalEvent = (event: ChangeEvent): void => { + if (!shouldHandleSignalEvent(event)) return + const key = String(event.key) + if (handledSignalKeys.has(key)) return + handledSignalKeys.add(key) + + const value = event.value as Partial + const alreadyHandled = value.status === `handled` + switch (value.signal) { + case `SIGINT`: + if (runAbortController) { + log.info(`SIGINT received, aborting active run`) + runAbortController.abort() + } else if (notification.entity?.status === `running`) { + // SIGINT may arrive in the small window before ctx.agent.run creates + // its controller. Only carry it forward for running entities; idle + // SIGINT is an ignored no-op and must not poison the next run. + log.info( + `SIGINT received before active run controller, queuing abort` + ) + pendingRunAbortRequested = true + } else { + log.info(`SIGINT received with no active run, ignoring`) + markSignalHandled(event, `ignored`, notification.entity?.status) + void flushProducedWrites().catch((err) => + failBackgroundWake(err, `SIGNAL_HANDLE_FAILED`) + ) + return + } + markSignalHandled(event, `aborted`, notification.entity?.status) + void flushProducedWrites().catch((err) => + failBackgroundWake(err, `SIGNAL_HANDLE_FAILED`) + ) + return + case `SIGKILL`: + log.info(`SIGKILL received, aborting active run and closing wake`) + signalAbortRequested = true + runAbortController?.abort() + requestShutdown() + if (!alreadyHandled) { + markSignalHandled(event, `shutdown_requested`, `killed`) + void flushProducedWrites().catch((err) => + failBackgroundWake(err, `SIGNAL_HANDLE_FAILED`) + ) + } + return + case `SIGHUP`: + log.info(`SIGHUP received, closing wake after current checkpoint`) + requestShutdown() + invokeSignalHandler(value, () => { + markSignalHandled( + event, + `shutdown_requested`, + notification.entity?.status + ) + void flushProducedWrites().catch((err) => + failBackgroundWake(err, `SIGNAL_HANDLE_FAILED`) + ) + }) + return + case `SIGTERM`: + log.info(`SIGTERM received, closing wake after cleanup`) + requestShutdown() + invokeSignalHandler(value, (outcome) => { + markSignalHandled( + event, + outcome === `failed` ? `failed` : `shutdown_requested`, + `stopped` + ) + void flushProducedWrites().catch((err) => + failBackgroundWake(err, `SIGNAL_HANDLE_FAILED`) + ) + }) + return + case `SIGSTOP`: + log.info(`SIGSTOP received, pausing after current checkpoint`) + clearIdleTimer() + idleController?.abort() + pauseRequested = true + markSignalHandled(event, `transitioned`, `paused`) + void flushProducedWrites().catch((err) => + failBackgroundWake(err, `SIGNAL_HANDLE_FAILED`) + ) + return + case `SIGCONT`: + log.info(`SIGCONT received, resuming queued work`) + resumeRequested = true + clearIdleTimer() + idleController?.abort() + markSignalHandled(event, `transitioned`, `idle`) + void flushProducedWrites().catch((err) => + failBackgroundWake(err, `SIGNAL_HANDLE_FAILED`) + ) + return + case `SIGUSR`: + invokeSignalHandler(value, (outcome) => { + markSignalHandled(event, outcome, notification.entity?.status) + void flushProducedWrites().catch((err) => + failBackgroundWake(err, `SIGNAL_HANDLE_FAILED`) + ) + }) + return + } + } + const filterAcceptedLiveEvents = ( batch: JsonBatch ): Array => { @@ -794,6 +985,12 @@ export async function processWake( return } + for (const event of changeEvents) { + if (event.type === `signal`) { + handleSignalEvent(event) + } + } + catchUpEvents.push(...changeEvents) if ( @@ -873,6 +1070,12 @@ export async function processWake( claimedWake = true writeToken = claimed.writeToken ?? `` + for (const event of catchUpEvents) { + if (event.type === `signal`) { + handleSignalEvent(event) + } + } + // 3b. Start heartbeat once this worker owns the wake heartbeat = setInterval(() => { createClaimCallbackHeaders(claimHeaderConfig, activeClaimToken) @@ -1376,6 +1579,10 @@ export async function processWake( const waitForSharedStateWiring = async (): Promise => { await pendingSharedStateWiring } + const waitForSignalHandlers = async (): Promise => { + if (pendingSignalHandlers.length === 0) return + await Promise.allSettled([...pendingSignalHandlers]) + } const drainAllPendingWrites = async (): Promise => { await db.utils.drainPendingWrites() for (const sdb of secondaryDbs) { @@ -1525,6 +1732,25 @@ export async function processWake( for (;;) { if (skipInitialHandlerPass) { skipInitialHandlerPass = false + if (signalAbortRequested) { + signalAbortRequested = false + drainNonFreshPendingBatches() + log.info(`signal abort completed, closing wake`) + break + } + if (pauseRequested) { + pauseRequested = false + drainNonFreshPendingBatches() + log.info(`pause requested, closing wake`) + break + } + if (resumeRequested) { + resumeRequested = false + if (await promoteNextPendingInboxMessage()) { + currentWakeEvents = [] + continue + } + } const resumed = await awaitIdleForFreshWork( `skipping initial handler pass: no fresh wake input in catch-up; entering idle (${idleTimeout / 1000}s timeout)` ) @@ -1563,6 +1789,11 @@ export async function processWake( await promoteNextPendingInboxMessage() } + runAbortController = new AbortController() + if (pendingRunAbortRequested) { + pendingRunAbortRequested = false + runAbortController.abort() + } const { ctx: handlerCtx, getSleepRequested } = createHandlerContext({ entityUrl, entityType: typeName, @@ -1583,6 +1814,10 @@ export async function processWake( writeEvent, wakeSession, wakeEvent: currentWakeEvent, + runSignal: runAbortController.signal, + registerSignalHandler: (handler) => { + activeSignalHandler = handler + }, doObserve, doSpawn, doMkdb, @@ -1614,6 +1849,8 @@ export async function processWake( await definition.handler(handlerCtx, currentWakeEvent) handlerMs += +(performance.now() - handlerT0).toFixed(2) log.info(`handler returned`) + await waitForSignalHandlers() + activeSignalHandler = null await waitForSharedStateWiring() await drainAllPendingWrites() await Promise.all(pendingWakeRegistrations) @@ -1632,6 +1869,8 @@ export async function processWake( throw liveProcessError } } catch (setupErr) { + runAbortController = null + activeSignalHandler = null wakeSession.rollbackManifestEntries() const errMsg = toError(setupErr).message log.error(`handler failed for ${entityUrl}:`, errMsg) @@ -1671,6 +1910,8 @@ export async function processWake( } throw setupErr } + runAbortController = null + activeSignalHandler = null if (!result) { result = { @@ -1687,12 +1928,26 @@ export async function processWake( break } + if (signalAbortRequested) { + signalAbortRequested = false + drainNonFreshPendingBatches() + log.info(`signal abort completed, closing wake`) + break + } + if (sleepRequested) { drainNonFreshPendingBatches() log.info(`handler returned with sleep(), closing immediately`) break } + if (pauseRequested) { + pauseRequested = false + drainNonFreshPendingBatches() + log.info(`pause requested, closing wake`) + break + } + const nextWake = dequeueNextWakeFromPending() if (nextWake) { log.info( diff --git a/packages/agents-runtime/src/runtime-server-client.ts b/packages/agents-runtime/src/runtime-server-client.ts index ed8e241d06..c0ebfabfec 100644 --- a/packages/agents-runtime/src/runtime-server-client.ts +++ b/packages/agents-runtime/src/runtime-server-client.ts @@ -77,11 +77,28 @@ export interface RegisterWakeOptions { manifestKey?: string } +export type EntitySignal = + | `SIGINT` + | `SIGHUP` + | `SIGTERM` + | `SIGKILL` + | `SIGSTOP` + | `SIGCONT` + | `SIGUSR` + +export interface SignalEntityOptions { + entityUrl: string + signal: EntitySignal + reason?: string + payload?: unknown +} + export interface RuntimeServerClient { sendEntityMessage: (options: SendEntityMessageOptions) => Promise spawnEntity: (options: SpawnEntityOptions) => Promise getEntityInfo: (entityUrl: string) => Promise ensureSharedStateStream: (sharedStateId: string) => Promise + signalEntity: (options: SignalEntityOptions) => Promise<{ txid: number }> deleteEntity: (entityUrl: string) => Promise getSharedStateStreamPath: (sharedStateId: string) => string registerWake: (options: RegisterWakeOptions) => Promise @@ -144,8 +161,8 @@ export function getSharedStateStreamPath(sharedStateId: string): string { return `/_electric/shared-state/${sharedStateId}` } -function entityRpcPath(entityUrl: string): string { - return `/_electric/entities${entityUrl}` +function entityRpcPath(entityUrl: string, suffix = ``): string { + return `/_electric/entities${entityUrl}${suffix}` } export function createRuntimeServerClient( @@ -342,15 +359,42 @@ export function createRuntimeServerClient( return streamPath } - const deleteEntity = async (entityUrl: string): Promise => { - const response = await request(entityRpcPath(entityUrl), { - method: `DELETE`, + const signalEntity = async ({ + entityUrl, + signal, + reason, + payload, + }: SignalEntityOptions): Promise<{ txid: number }> => { + const body: Record = { signal } + if (reason !== undefined) body.reason = reason + if (payload !== undefined) body.payload = payload + + const response = await request(entityRpcPath(entityUrl, `/signal`), { + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify(body), }) - if (!response.ok && response.status !== 404) { + if (!response.ok) { throw new Error( - `delete ${entityUrl} failed (${response.status}): ${await readErrorText(response)}` + `signal ${entityUrl} ${signal} failed (${response.status}): ${await readErrorText(response)}` ) } + return (await response.json()) as { txid: number } + } + + const deleteEntity = async (entityUrl: string): Promise => { + try { + await signalEntity({ + entityUrl, + signal: `SIGKILL`, + reason: `Runtime child cleanup`, + }) + } catch (err) { + if (err instanceof Error && /\(404\)/.test(err.message)) { + return + } + throw err + } } const registerWake = async (options: RegisterWakeOptions): Promise => { @@ -540,6 +584,7 @@ export function createRuntimeServerClient( spawnEntity, getEntityInfo, ensureSharedStateStream, + signalEntity, deleteEntity, getSharedStateStreamPath, registerWake, diff --git a/packages/agents-runtime/src/types.ts b/packages/agents-runtime/src/types.ts index 4546a87aee..416e580508 100644 --- a/packages/agents-runtime/src/types.ts +++ b/packages/agents-runtime/src/types.ts @@ -37,6 +37,7 @@ import type { ContextEntryAttrs as EntityContextEntryAttrs, ContextInserted as EntityContextInserted, ContextRemoved as EntityContextRemoved, + EntitySignal, Manifest as EntityManifest, ManifestChildEntry as EntityManifestChildEntry, ManifestContextEntry as EntityManifestContextEntry, @@ -61,6 +62,7 @@ export type EntityStreamDBWithActions< actions: GeneratedStateActions & HandlerActions } export type ChildStatus = ChildStatusEntry +export type { EntitySignal } export type ObservationCollectionMap = Record export type ObservationStreamDB = BaseStreamDB export type EntitiesObservationHandle = ObservationHandle & { @@ -885,6 +887,18 @@ export interface HandlerContext< payload: unknown, opts?: { type?: string; afterMs?: number } ) => void + /** + * Register a handler for lifecycle signals delivered while this wake is active. + * Only catchable signals are delivered here; SIGKILL is terminal and bypasses + * entity code. + */ + onSignal: ( + handler: (signal: { + signal: EntitySignal + reason?: string + payload?: unknown + }) => void | Promise + ) => void /** * Record a non-LLM run on the entity's built-in `runs` collection. * Use this to bracket an external operation (CLI subprocess, HTTP diff --git a/packages/agents-runtime/test/electric-agents-client.test.ts b/packages/agents-runtime/test/electric-agents-client.test.ts index 9d1b238d9d..83b56ba239 100644 --- a/packages/agents-runtime/test/electric-agents-client.test.ts +++ b/packages/agents-runtime/test/electric-agents-client.test.ts @@ -7,6 +7,7 @@ const { mockState } = vi.hoisted(() => ({ mockState: { registerEntitiesSource: vi.fn(), registerCronSource: vi.fn(), + signalEntity: vi.fn(), createStreamDB: vi.fn(), preload: vi.fn(), observedDb: { @@ -22,6 +23,7 @@ vi.mock(`../src/runtime-server-client`, () => ({ createRuntimeServerClient: () => ({ registerEntitiesSource: mockState.registerEntitiesSource, registerCronSource: mockState.registerCronSource, + signalEntity: mockState.signalEntity, }), })) @@ -43,6 +45,7 @@ describe(`createAgentsClient`, () => { streamUrl: `/_entities/source-1`, }) mockState.createStreamDB = vi.fn() + mockState.signalEntity = vi.fn().mockResolvedValue({ txid: 123 }) mockState.observedDb = { preload: vi.fn().mockResolvedValue(undefined), collections: { @@ -117,4 +120,31 @@ describe(`createAgentsClient`, () => { state: source.schema, }) }) + + it(`exposes signal and kill helpers through the server client`, async () => { + const client = createAgentsClient({ + baseUrl: `http://electric-agents.test`, + }) + + await expect( + client.signal({ + entityUrl: `/chat/demo`, + signal: `SIGINT`, + reason: `stop`, + }) + ).resolves.toEqual({ txid: 123 }) + + await client.kill(`/chat/demo`, `cleanup`) + + expect(mockState.signalEntity).toHaveBeenNthCalledWith(1, { + entityUrl: `/chat/demo`, + signal: `SIGINT`, + reason: `stop`, + }) + expect(mockState.signalEntity).toHaveBeenNthCalledWith(2, { + entityUrl: `/chat/demo`, + signal: `SIGKILL`, + reason: `cleanup`, + }) + }) }) diff --git a/packages/agents-runtime/test/helpers/event-fixtures.ts b/packages/agents-runtime/test/helpers/event-fixtures.ts index 9b3c8a4cd5..f2c87fe390 100644 --- a/packages/agents-runtime/test/helpers/event-fixtures.ts +++ b/packages/agents-runtime/test/helpers/event-fixtures.ts @@ -43,6 +43,31 @@ export function ev( headers: normalizedHeaders, }) as ChangeEvent + case `signal`: + return ( + operation === `insert` + ? entityStateSchema.signals.insert({ + key, + value: { + signal: `SIGINT`, + status: `unhandled`, + timestamp: FIXED_TIMESTAMP, + ...value, + } as never, + headers: normalizedHeaders, + }) + : entityStateSchema.signals.update({ + key, + value: { + signal: `SIGINT`, + status: `handled`, + timestamp: FIXED_TIMESTAMP, + ...value, + } as never, + headers: normalizedHeaders, + }) + ) as ChangeEvent + case `run`: return ( operation === `insert` diff --git a/packages/agents-runtime/test/pi-adapter.test.ts b/packages/agents-runtime/test/pi-adapter.test.ts index 38c4d2841c..a9f4e98ea0 100644 --- a/packages/agents-runtime/test/pi-adapter.test.ts +++ b/packages/agents-runtime/test/pi-adapter.test.ts @@ -4,6 +4,7 @@ import { resolvePiModel, toAgentHistory, } from '../src/pi-adapter' +import { createAssistantMessageEventStream } from '@mariozechner/pi-ai' import type { OutboundIdSeed } from '../src/outbound-bridge' import type { LLMMessage } from '../src/types' import type { ChangeEvent } from '@durable-streams/state' @@ -52,9 +53,110 @@ describe(`createPiAgentAdapter`, () => { expect(typeof handle.run).toBe(`function`) expect(typeof handle.steer).toBe(`function`) expect(typeof handle.isRunning).toBe(`function`) + expect(typeof handle.abort).toBe(`function`) expect(typeof handle.dispose).toBe(`function`) }) + it(`aborts an active run when the run signal is aborted`, async () => { + let abortSeenResolve: (() => void) | null = null + const abortSeen = new Promise((resolve) => { + abortSeenResolve = resolve + }) + const abortedMessage: AssistantMessage = { + role: `assistant`, + content: [{ type: `text`, text: `` }], + api: `anthropic-messages`, + provider: `anthropic`, + model: `claude-sonnet-4-5-20250929`, + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + total: 0, + }, + }, + stopReason: `aborted`, + timestamp: Date.now(), + } + + const factory = createPiAgentAdapter({ + systemPrompt: `Test system prompt`, + model: `claude-sonnet-4-5-20250929`, + tools: [], + streamFn: (_model, _context, options) => { + const stream = createAssistantMessageEventStream() + if (options?.signal?.aborted) { + abortSeenResolve?.() + stream.end(abortedMessage) + return stream + } + options?.signal?.addEventListener( + `abort`, + () => { + abortSeenResolve?.() + stream.end(abortedMessage) + }, + { once: true } + ) + return stream + }, + }) + + const handle = factory({ + entityUrl: `test/entity-1`, + epoch: 1, + messages: [], + outboundIdSeed: { run: 0, step: 0, msg: 0, tc: 0 }, + writeEvent: (_event: ChangeEvent) => {}, + }) + const controller = new AbortController() + const runPromise = handle.run(`hello`, controller.signal) + + controller.abort() + await abortSeen + await expect(runPromise).resolves.toBeUndefined() + expect(handle.isRunning()).toBe(false) + }) + + it(`settles an aborted run even if the model stream does not emit completion`, async () => { + const factory = createPiAgentAdapter({ + systemPrompt: `Test system prompt`, + model: `claude-sonnet-4-5-20250929`, + tools: [], + streamFn: (_model, _context, options) => { + const stream = createAssistantMessageEventStream() + options?.signal?.addEventListener(`abort`, () => {}, { once: true }) + return stream + }, + }) + + const handle = factory({ + entityUrl: `test/entity-1`, + epoch: 1, + messages: [], + outboundIdSeed: { run: 0, step: 0, msg: 0, tc: 0 }, + writeEvent: (_event: ChangeEvent) => {}, + }) + const controller = new AbortController() + const runPromise = handle.run(`hello`, controller.signal) + + controller.abort() + await expect( + Promise.race([ + runPromise.then(() => `resolved`), + new Promise((resolve) => setTimeout(() => resolve(`timed-out`), 50)), + ]) + ).resolves.toBe(`resolved`) + expect(handle.isRunning()).toBe(false) + }) + it(`isRunning returns false initially`, () => { const factory = createPiAgentAdapter({ systemPrompt: `Test system prompt`, diff --git a/packages/agents-runtime/test/process-wake.test.ts b/packages/agents-runtime/test/process-wake.test.ts index d1fbf6df5b..18f6265d89 100644 --- a/packages/agents-runtime/test/process-wake.test.ts +++ b/packages/agents-runtime/test/process-wake.test.ts @@ -1,5 +1,6 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { createTransaction } from '@durable-streams/state' +import { createAssistantMessageEventStream } from '@mariozechner/pi-ai' import { getCronSourceRef } from '../src/cron-utils' import { manifestSourceKey } from '../src/manifest-helpers' import { db } from '../src/observation-sources' @@ -13,6 +14,7 @@ import type { MockInstance } from 'vitest' import type { ProcessWakeConfig, WebhookNotification } from '../src/types' import type { ChangeEvent } from '@durable-streams/state' import type { ErrorEvent, Manifest } from '../src/entity-schema' +import type { AssistantMessage } from '@mariozechner/pi-ai' // --------------------------------------------------------------------------- // Mock @durable-streams/client @@ -578,6 +580,443 @@ describe(`processWake`, () => { expect(doneCalls).toHaveLength(0) }) + it(`closes immediately for SIGSTOP when there is no handler pass to checkpoint`, async () => { + const handler = vi.fn() + defineEntity(`test-agent`, { + handler, + }) + + mockDbPreload.mockImplementationOnce(async () => { + mockEntityOnBatch.current?.({ + items: [ + ev( + `signal`, + `sigstop-1`, + `insert`, + { signal: `SIGSTOP`, status: `unhandled` }, + { offset: `10_500` } + ), + ], + offset: `10_500`, + }) + }) + + const notification = makeNotification() + notification.entity!.status = `running` + + await processWake(notification, BASE_CONFIG) + + expect(handler).not.toHaveBeenCalled() + const doneCalls = fetchMock.mock.calls.filter( + ([url, opts]) => + String(url).includes(`/_electric/wakes/wake-abc`) && + (opts?.body as string | undefined)?.includes(`"done":true`) + ) + expect(doneCalls).toHaveLength(1) + const body = JSON.parse(doneCalls[0]![1]!.body as string) as { + acks: Array<{ path: string; offset: string }> + done: boolean + } + expect(body.done).toBe(true) + expect(body.acks).toEqual([ + { path: `/streams/entity:agent-1`, offset: `10_500` }, + ]) + }) + + it(`lets SIGSTOP pause at the next handler checkpoint without aborting the run`, async () => { + const handlerEvents: Array = [] + let handlerStartedResolve: (() => void) | null = null + const handlerStarted = new Promise((resolve) => { + handlerStartedResolve = resolve + }) + let releaseHandler = (): void => { + throw new Error(`expected handler release`) + } + const handlerBlock = new Promise((resolve) => { + releaseHandler = resolve + }) + + defineEntity(`test-agent`, { + handler: async () => { + handlerEvents.push(`started`) + handlerStartedResolve?.() + await handlerBlock + handlerEvents.push(`completed`) + }, + }) + + const notification = makeNotification() + notification.entity!.status = `running` + const wakePromise = processWake(notification, BASE_CONFIG) + await handlerStarted + + mockEntityOnBatch.current?.({ + items: [ + ev( + `signal`, + `sigstop-1`, + `insert`, + { signal: `SIGSTOP`, status: `unhandled` }, + { offset: `10_500` } + ), + ], + offset: `10_500`, + }) + + await new Promise((resolve) => setTimeout(resolve, 5)) + expect(handlerEvents).toEqual([`started`]) + + releaseHandler() + await wakePromise + + expect(handlerEvents).toEqual([`started`, `completed`]) + const doneCalls = fetchMock.mock.calls.filter( + ([url, opts]) => + String(url).includes(`/_electric/wakes/wake-abc`) && + (opts?.body as string | undefined)?.includes(`"done":true`) + ) + const body = JSON.parse(doneCalls.at(-1)![1]!.body as string) as { + acks: Array<{ path: string; offset: string }> + } + expect(body.acks).toEqual([ + { path: `/streams/entity:agent-1`, offset: `10_500` }, + ]) + }) + + it(`does not continue queued work after SIGSTOP is followed by SIGINT`, async () => { + const wakePayloads: Array = [] + let handlerStartedResolve: (() => void) | null = null + const handlerStarted = new Promise((resolve) => { + handlerStartedResolve = resolve + }) + let releaseHandler = (): void => { + throw new Error(`expected handler release`) + } + const handlerBlock = new Promise((resolve) => { + releaseHandler = resolve + }) + + defineEntity(`test-agent`, { + handler: async (_ctx, wake) => { + wakePayloads.push(wake.payload) + if (wakePayloads.length === 1) { + handlerStartedResolve?.() + await handlerBlock + } + }, + }) + + const notification = makeNotification({ triggerEvent: `inbox` }) + notification.entity!.status = `running` + const wakePromise = processWake(notification, BASE_CONFIG) + await handlerStarted + + mockEntityOnBatch.current?.({ + items: [ + ev( + `signal`, + `sigstop-1`, + `insert`, + { signal: `SIGSTOP`, status: `unhandled` }, + { offset: `10_500` } + ), + ], + offset: `10_500`, + }) + mockEntityOnBatch.current?.({ + items: [ + ev( + `signal`, + `sigint-1`, + `insert`, + { signal: `SIGINT`, status: `unhandled` }, + { offset: `10_600` } + ), + ], + offset: `10_600`, + }) + mockEntityOnBatch.current?.({ + items: [ + ev( + `inbox`, + `m-queued`, + `insert`, + { payload: `should not run` }, + { offset: `10_700` } + ), + ], + offset: `10_700`, + }) + + releaseHandler() + await wakePromise + + expect(wakePayloads).toEqual([undefined]) + const doneCalls = fetchMock.mock.calls.filter( + ([url, opts]) => + String(url).includes(`/_electric/wakes/wake-abc`) && + (opts?.body as string | undefined)?.includes(`"done":true`) + ) + const body = JSON.parse(doneCalls.at(-1)![1]!.body as string) as { + acks: Array<{ path: string; offset: string }> + } + expect(body.acks).toEqual([ + { path: `/streams/entity:agent-1`, offset: `10_600` }, + ]) + }) + + it(`applies SIGINT that arrives before the handler run controller is created`, async () => { + let abortSeenResolve: (() => void) | null = null + const abortSeen = new Promise((resolve) => { + abortSeenResolve = resolve + }) + const abortedMessage: AssistantMessage = { + role: `assistant`, + content: [{ type: `text`, text: `` }], + api: `anthropic-messages`, + provider: `anthropic`, + model: `claude-sonnet-4-5-20250929`, + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + total: 0, + }, + }, + stopReason: `aborted`, + timestamp: Date.now(), + } + + defineEntity(`test-agent`, { + handler: async (ctx) => { + ctx.useAgent({ + systemPrompt: `test`, + model: `claude-sonnet-4-5-20250929`, + tools: [], + streamFn: (_model, _context, options) => { + const stream = createAssistantMessageEventStream() + if (options?.signal?.aborted) { + abortSeenResolve?.() + stream.end(abortedMessage) + return stream + } + options?.signal?.addEventListener( + `abort`, + () => { + abortSeenResolve?.() + stream.end(abortedMessage) + }, + { once: true } + ) + return stream + }, + }) + await ctx.agent.run() + }, + }) + + setTimeout(() => { + mockEntityOnBatch.current?.({ + items: [ + ev( + `signal`, + `sigint-early`, + `insert`, + { signal: `SIGINT`, status: `unhandled` }, + { offset: `10_500` } + ), + ], + offset: `10_500`, + }) + }, 0) + + const notification = makeNotification({ triggerEvent: `inbox` }) + notification.entity!.status = `running` + + await processWake(notification, BASE_CONFIG) + + await abortSeen + const doneCalls = fetchMock.mock.calls.filter( + ([url, opts]) => + String(url).includes(`/_electric/wakes/wake-abc`) && + (opts?.body as string | undefined)?.includes(`"done":true`) + ) + const body = JSON.parse(doneCalls.at(-1)![1]!.body as string) as { + acks: Array<{ path: string; offset: string }> + } + expect(body.acks).toEqual([ + { path: `/streams/entity:agent-1`, offset: `10_500` }, + ]) + }) + + it(`ignores SIGINT for idle entities without aborting the next run`, async () => { + let abortSeen = false + const completedMessage: AssistantMessage = { + role: `assistant`, + content: [{ type: `text`, text: `ok` }], + api: `anthropic-messages`, + provider: `anthropic`, + model: `claude-sonnet-4-5-20250929`, + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + total: 0, + }, + }, + stopReason: `stop`, + timestamp: Date.now(), + } + + defineEntity(`test-agent`, { + handler: async (ctx) => { + ctx.useAgent({ + systemPrompt: `test`, + model: `claude-sonnet-4-5-20250929`, + tools: [], + streamFn: (_model, _context, options) => { + const stream = createAssistantMessageEventStream() + if (options?.signal?.aborted) { + abortSeen = true + } + options?.signal?.addEventListener( + `abort`, + () => { + abortSeen = true + }, + { once: true } + ) + queueMicrotask(() => stream.end(completedMessage)) + return stream + }, + }) + await ctx.agent.run() + }, + }) + + mockDbPreload.mockImplementationOnce(async () => { + mockEntityOnBatch.current?.({ + items: [ + ev( + `signal`, + `sigint-idle`, + `insert`, + { signal: `SIGINT`, status: `unhandled` }, + { offset: `10_500` } + ), + ], + offset: `10_500`, + }) + }) + + const notification = makeNotification({ triggerEvent: `inbox` }) + notification.entity!.status = `idle` + + await processWake(notification, BASE_CONFIG) + + expect(abortSeen).toBe(false) + expect(mockProducerAppend).toHaveBeenCalledWith( + expect.stringContaining(`"outcome":"ignored"`) + ) + }) + + it(`aborts an active run for server-handled SIGKILL without rewriting the signal`, async () => { + let abortSeenResolve: (() => void) | null = null + const abortSeen = new Promise((resolve) => { + abortSeenResolve = resolve + }) + let handlerStartedResolve: (() => void) | null = null + const handlerStarted = new Promise((resolve) => { + handlerStartedResolve = resolve + }) + const abortedMessage: AssistantMessage = { + role: `assistant`, + content: [{ type: `text`, text: `` }], + api: `anthropic-messages`, + provider: `anthropic`, + model: `claude-sonnet-4-5-20250929`, + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + total: 0, + }, + }, + stopReason: `aborted`, + timestamp: Date.now(), + } + + defineEntity(`test-agent`, { + handler: async (ctx) => { + ctx.useAgent({ + systemPrompt: `test`, + model: `claude-sonnet-4-5-20250929`, + tools: [], + streamFn: (_model, _context, options) => { + handlerStartedResolve?.() + const stream = createAssistantMessageEventStream() + options?.signal?.addEventListener( + `abort`, + () => { + abortSeenResolve?.() + stream.end(abortedMessage) + }, + { once: true } + ) + return stream + }, + }) + await ctx.agent.run() + }, + }) + + const notification = makeNotification({ triggerEvent: `inbox` }) + notification.entity!.status = `running` + const wakePromise = processWake(notification, BASE_CONFIG) + await handlerStarted + + mockEntityOnBatch.current?.({ + items: [ + ev( + `signal`, + `sigkill-handled`, + `insert`, + { signal: `SIGKILL`, status: `handled` }, + { offset: `10_500` } + ), + ], + offset: `10_500`, + }) + + await wakePromise + await abortSeen + + expect(mockProducerAppend).not.toHaveBeenCalledWith( + expect.stringContaining(`sigkill-handled`) + ) + }) + it(`surfaces both the primary wake error and done callback failure`, async () => { defineEntity(`test-agent`, { handler: () => { diff --git a/packages/agents-server-conformance-tests/src/electric-agents-dsl.ts b/packages/agents-server-conformance-tests/src/electric-agents-dsl.ts index ced25fbd0b..aacf89f0a7 100644 --- a/packages/agents-server-conformance-tests/src/electric-agents-dsl.ts +++ b/packages/agents-server-conformance-tests/src/electric-agents-dsl.ts @@ -11,7 +11,7 @@ * .respondDone() * .expectStatus('running') * .kill() - * .expectStatus('stopped') + * .expectStatus('killed') * .run() */ @@ -1230,9 +1230,17 @@ async function executeStep(ctx: RunContext, step: Step): Promise { if (!entityUrl) throw new Error(`No current entity — did you spawn first?`) - const res = await electricAgentsFetch(ctx.baseUrl, entityUrl, { - method: `DELETE`, - }) + const res = await electricAgentsFetch( + ctx.baseUrl, + `${entityUrl}/signal`, + { + method: `POST`, + body: JSON.stringify({ + signal: `SIGKILL`, + reason: `Killed by conformance test`, + }), + } + ) expect(res.status).toBe(200) ctx.history.push({ @@ -2028,8 +2036,8 @@ function checkStreamPathsMatchEntityUrl(history: Array): void { /** * Spec S4 — Safety: entity status transitions must be valid. * spawning → running is valid (at spawn time) - * running/idle → stopped is valid (at kill time) - * stopped → running is NOT valid + * running/idle → killed is valid (at kill time) + * killed → running is NOT valid * Soundness: Sound | Completeness: Incomplete (only checks observed status reads) */ function checkStatusTransitionsValid(history: Array): void { @@ -2041,11 +2049,11 @@ function checkStatusTransitionsValid(history: Array): void { } if (event.type === `entity_status_checked`) { const prev = lastStatus.get(event.entityUrl) - if (prev === `stopped`) { + if (prev === `killed`) { expect( event.status, - `Safety: entity ${event.entityUrl} transitioned from stopped to ${event.status}` - ).toBe(`stopped`) + `Safety: entity ${event.entityUrl} transitioned from killed to ${event.status}` + ).toBe(`killed`) } lastStatus.set(event.entityUrl, event.status) } @@ -2237,7 +2245,7 @@ export interface EntityTypeModel { export interface EntityModel { url: string typeName: string - status: `running` | `stopped` + status: `running` | `killed` messageCount: number } @@ -2367,7 +2375,7 @@ export function applyElectricAgentsAction( if (targetIdx === undefined) return model const entities = [...model.entities] const e = entities[targetIdx]! - entities[targetIdx] = { ...e, status: `stopped` } + entities[targetIdx] = { ...e, status: `killed` } return { ...model, entities } } case `check_status`: diff --git a/packages/agents-server-conformance-tests/src/electric-agents-tests.ts b/packages/agents-server-conformance-tests/src/electric-agents-tests.ts index 05c1e3454f..ad6adc122d 100644 --- a/packages/agents-server-conformance-tests/src/electric-agents-tests.ts +++ b/packages/agents-server-conformance-tests/src/electric-agents-tests.ts @@ -343,9 +343,9 @@ export function runElectricAgentsConformanceTests( .spawn(`status-filter-agent`, `entity-1`) .spawn(`status-filter-agent`, `entity-2`) .kill() - .list({ status: `stopped` }) + .list({ status: `killed` }) .custom(async (ctx) => { - expect(ctx.lastListResult!.every((e) => e.status === `stopped`)).toBe( + expect(ctx.lastListResult!.every((e) => e.status === `killed`)).toBe( true ) }) @@ -390,7 +390,7 @@ export function runElectricAgentsConformanceTests( }) .spawn(`kill-test-agent`, `entity-1`) .kill() - .expectStatus(`stopped`) + .expectStatus(`killed`) .expectSendError(`NOT_RUNNING`, 409) .run()) @@ -416,8 +416,9 @@ export function runElectricAgentsConformanceTests( expect((msgReceived as any).value?.payload).toEqual({ before: `kill`, }) - const stopped = msgs.find((m) => m.type === `entity_stopped`) - expect(stopped).toBeDefined() + const signal = msgs.find((m) => m.type === `signal`) + expect(signal).toBeDefined() + expect((signal as any).value?.signal).toBe(`SIGKILL`) }) .run()) @@ -442,9 +443,14 @@ export function runElectricAgentsConformanceTests( secondEntityUrl = ctx.currentEntityUrl }) .custom(async (ctx) => { - const res = await electricAgentsFetch(ctx.baseUrl, firstEntityUrl!, { - method: `DELETE`, - }) + const res = await electricAgentsFetch( + ctx.baseUrl, + `${firstEntityUrl!}/signal`, + { + method: `POST`, + body: JSON.stringify({ signal: `SIGKILL` }), + } + ) expect(res.status).toBe(200) ctx.history.push({ type: `entity_killed`, @@ -491,9 +497,9 @@ export function runElectricAgentsConformanceTests( const entity = await pollEntityStatus( ctx.baseUrl, ctx.currentEntityUrl!, - [`stopped`] + [`killed`] ) - expect(entity.status).toBe(`stopped`) + expect(entity.status).toBe(`killed`) }) .expectSendError(`NOT_RUNNING`, 409) .run()) @@ -1541,9 +1547,14 @@ export function runElectricAgentsConformanceTests( scenario.custom(async (ctx: RunContext) => { const url = entityUrls[targetIdx] if (!url) return - const res = await electricAgentsFetch(ctx.baseUrl, url, { - method: `DELETE`, - }) + const res = await electricAgentsFetch( + ctx.baseUrl, + `${url}/signal`, + { + method: `POST`, + body: JSON.stringify({ signal: `SIGKILL` }), + } + ) expect(res.status).toBe(200) ctx.history.push({ type: `entity_killed`, @@ -2090,8 +2101,11 @@ export function runElectricAgentsConformanceTests( }) const res = await electricAgentsFetch( config.baseUrl, - `/${typeName}/nonexistent-id-12345`, - { method: `DELETE` } + `/${typeName}/nonexistent-id-12345/signal`, + { + method: `POST`, + body: JSON.stringify({ signal: `SIGKILL` }), + } ) expect(res.status).toBe(404) const body = (await res.json()) as { error: { code: string } } @@ -2916,9 +2930,9 @@ export function runCliConformanceTests(config: CliTestOptions): void { const entity = await pollEntityStatus( baseUrl, `/cli-kill-type/${id}`, - [`stopped`] + [`killed`] ) - expect(entity.status).toBe(`stopped`) + expect(entity.status).toBe(`killed`) }) .run() }, 15_000) @@ -2965,9 +2979,9 @@ export function runCliConformanceTests(config: CliTestOptions): void { const entity = await pollEntityStatus( baseUrl, `/cli-lifecycle-type/${id}`, - [`stopped`] + [`killed`] ) - expect(entity.status).toBe(`stopped`) + expect(entity.status).toBe(`killed`) }) .exec(`ps`, `--status`, `running`) .expectExitCode(0) diff --git a/packages/agents-server-ui/src/components/EntityHeader.tsx b/packages/agents-server-ui/src/components/EntityHeader.tsx index 4c1de0127d..ff9511f2fb 100644 --- a/packages/agents-server-ui/src/components/EntityHeader.tsx +++ b/packages/agents-server-ui/src/components/EntityHeader.tsx @@ -14,7 +14,10 @@ const STATUS_TONE: Record = { running: `info`, idle: `success`, spawning: `warning`, + paused: `warning`, + stopping: `warning`, stopped: `neutral`, + killed: `danger`, } type EntityHeaderProps = { diff --git a/packages/agents-server-ui/src/components/EntityTimeline.tsx b/packages/agents-server-ui/src/components/EntityTimeline.tsx index 1905b95984..3ced9cb00a 100644 --- a/packages/agents-server-ui/src/components/EntityTimeline.tsx +++ b/packages/agents-server-ui/src/components/EntityTimeline.tsx @@ -27,7 +27,10 @@ import { } from '../lib/timelineRowHeights' import { usePaneFindAdapterRegistration } from '../hooks/usePaneFind' import { useWorkspace } from '../hooks/useWorkspace' -import { useElectricAgents } from '../lib/ElectricAgentsProvider' +import { + useElectricAgents, + type ElectricEntity, +} from '../lib/ElectricAgentsProvider' import { warmMarkdownRenderCache } from '../lib/markdownRenderCache' import { Icon, IconButton, ScrollArea, Stack, Text, Tooltip } from '../ui' import { UserMessage } from './UserMessage' @@ -103,6 +106,7 @@ const CHAT_SURFACE_GUTTER = 24 const ROW_GAP = 24 const MANIFEST_ROW_GAP = 10 const ROW_SETTLE_MS = 500 +type EntityStatus = ElectricEntity[`status`] function timelineRowGap(row: TimelineEntry): number { return row.section.kind === `manifest` || row.section.kind === `wake` @@ -306,7 +310,7 @@ function ManifestTimelineRow({ manifest: Manifest entityUrl: string | null tileId: string | null - entityStatus?: IncludesEntity[`status`] + entityStatus?: EntityStatus }): React.ReactElement { const { helpers } = useWorkspace() const entityTarget = getManifestEntityUrl(manifest) @@ -556,16 +560,20 @@ function getManifestStateSourceId(manifest: Manifest): string | null { return null } -function statusTone(status: NonNullable) { +function statusTone(status: EntityStatus) { switch (status) { case `idle`: return `success` case `spawning`: + case `paused`: + case `stopping`: return `warning` case `running`: return `info` case `stopped`: return `neutral` + case `killed`: + return `danger` default: return `neutral` } @@ -611,6 +619,7 @@ function entityUrlsFromKey(key: string): Array { // object; splitting the props lets memo skip every settled row and // only re-render the streaming row + the row that just settled. const TimelineRow = memo(function TimelineRow({ + rowKey, section, responseTimestamp, entityStopped, @@ -619,7 +628,11 @@ const TimelineRow = memo(function TimelineRow({ entityUrl, tileId, entityStatusByUrl, + stopUserMessageKey, + stopPending, + onStopGeneration, }: { + rowKey: string section: TimelineEntry[`section`] responseTimestamp: TimelineEntry[`responseTimestamp`] entityStopped: boolean @@ -627,10 +640,20 @@ const TimelineRow = memo(function TimelineRow({ renderWidth: number entityUrl: string | null tileId: string | null - entityStatusByUrl: Map + entityStatusByUrl: Map + stopUserMessageKey: string | null + stopPending: boolean + onStopGeneration?: () => void }): React.ReactElement { if (section.kind === `user_message`) { - return + return ( + + ) } if (section.kind === `wake`) { return @@ -670,6 +693,8 @@ export function EntityTimeline({ entityUrl = null, entities = [], scrollToBottomSignal = 0, + stopPending = false, + onStopGeneration, }: { entries: Array loading: boolean @@ -680,6 +705,8 @@ export function EntityTimeline({ entityUrl?: string | null entities?: Array scrollToBottomSignal?: number + stopPending?: boolean + onStopGeneration?: () => void }): React.ReactElement { const rows = useMemo(() => entries, [entries]) const { entitiesCollection } = useElectricAgents() @@ -707,9 +734,9 @@ export function EntityTimeline({ [entitiesCollection, referencedEntityUrlKey] ) const entityStatusByUrl = useMemo(() => { - const statusByUrl = new Map() + const statusByUrl = new Map() for (const entity of entities) { - statusByUrl.set(entity.url, entity.status) + if (entity.status) statusByUrl.set(entity.url, entity.status) } for (const entity of entityStatuses) { statusByUrl.set(entity.url, entity.status) @@ -753,6 +780,21 @@ export function EntityTimeline({ return null }, [rows]) + const stopUserMessageKey = useMemo(() => { + if (!lastStreamingAgentKey) return null + const streamingIndex = rows.findIndex( + (row) => row.key === lastStreamingAgentKey + ) + if (streamingIndex < 0) return null + for (let index = streamingIndex - 1; index >= 0; index--) { + const row = rows[index] + if (row?.section.kind === `user_message`) { + return row.key + } + } + return null + }, [lastStreamingAgentKey, rows]) + const persistSettledRows = useCallback(() => { if (!cacheKey || viewportWidth <= 0) return persistTimelineRowHeights( @@ -1176,6 +1218,7 @@ export function EntityTimeline({ }} > ) diff --git a/packages/agents-server-ui/src/components/MessageInput.module.css b/packages/agents-server-ui/src/components/MessageInput.module.css index 71837a72f3..fc975661ac 100644 --- a/packages/agents-server-ui/src/components/MessageInput.module.css +++ b/packages/agents-server-ui/src/components/MessageInput.module.css @@ -193,3 +193,29 @@ .composerSend.active:hover { background: var(--ds-accent-10); } +.composerSend.stop { + background: var(--ds-accent-9); + color: var(--ds-text-on-accent); +} +.composerSend.stop:hover { + background: var(--ds-accent-10); +} +.composerSend.stopPending { + cursor: progress; + opacity: 0.72; + animation: stopPendingPulse 1s ease-in-out infinite; +} + +.composerSend.stopPending:hover { + background: var(--ds-accent-9); +} + +@keyframes stopPendingPulse { + 0%, + 100% { + transform: scale(1); + } + 50% { + transform: scale(0.92); + } +} diff --git a/packages/agents-server-ui/src/components/MessageInput.tsx b/packages/agents-server-ui/src/components/MessageInput.tsx index 645c4aa5c7..e843786269 100644 --- a/packages/agents-server-ui/src/components/MessageInput.tsx +++ b/packages/agents-server-ui/src/components/MessageInput.tsx @@ -1,5 +1,5 @@ import { useCallback, useLayoutEffect, useMemo, useRef, useState } from 'react' -import { ArrowUp } from 'lucide-react' +import { ArrowUp, Square } from 'lucide-react' import type { EntityStreamDBWithActions } from '@electric-ax/agents-runtime/client' import { createDeleteInboxMessageAction, @@ -18,20 +18,26 @@ export function MessageInput({ baseUrl, entityUrl, disabled, + generationActive = false, + stopPending = false, pendingMessages = [], inlineQueuedSubmits = false, onOptimisticQueuedMessage, drawer, onSend, + onStop, }: { db: EntityStreamDBWithActions | null baseUrl: string entityUrl: string disabled: boolean + generationActive?: boolean + stopPending?: boolean pendingMessages?: EntityTimelineData[`inbox`] inlineQueuedSubmits?: boolean onOptimisticQueuedMessage?: (message: OptimisticInboxMessage) => void onSend?: () => void + onStop?: () => void /** * Optional content rendered above the composer, sharing its docked * width and lift into the timeline above. The composer is z-indexed @@ -95,6 +101,10 @@ export function MessageInput({ return createSteerInboxMessageAction({ db, baseUrl, entityUrl }) }, [db, baseUrl, entityUrl]) + const inputText = value.trim() + const showStop = generationActive && inputText.length === 0 && !disabled + const canStop = showStop && !stopPending + const handleSubmit = useCallback(() => { if (!value.trim() || disabled) return setError(null) @@ -119,6 +129,14 @@ export function MessageInput({ }) }, [value, sendAction, updateAction, editingMessage, disabled, onSend]) + const handleComposerAction = useCallback(() => { + if (canStop) { + onStop?.() + return + } + handleSubmit() + }, [canStop, handleSubmit, onStop]) + const startEditing = useCallback( (message: EntityTimelineData[`inbox`][number]) => { const text = readTextPayload(message.payload) @@ -188,7 +206,8 @@ export function MessageInput({ [updateAction] ) - const isActive = Boolean(value.trim() && !disabled) + const isActive = Boolean(inputText && !disabled) + const isButtonActive = isActive || showStop return ( @@ -243,14 +262,24 @@ export function MessageInput({ /> diff --git a/packages/agents-server-ui/src/components/SearchPalette.tsx b/packages/agents-server-ui/src/components/SearchPalette.tsx index 5e79bdec28..a64db02853 100644 --- a/packages/agents-server-ui/src/components/SearchPalette.tsx +++ b/packages/agents-server-ui/src/components/SearchPalette.tsx @@ -320,7 +320,8 @@ export function SearchPalette(): React.ReactElement | null { if ( forkEntity && !activeEntity.parent && - activeEntity.status !== `stopped` + activeEntity.status !== `stopped` && + activeEntity.status !== `killed` ) { out.push({ kind: `action`, @@ -343,7 +344,11 @@ export function SearchPalette(): React.ReactElement | null { }) } - if (killEntity && activeEntity.status !== `stopped`) { + if ( + killEntity && + activeEntity.status !== `stopped` && + activeEntity.status !== `killed` + ) { out.push({ kind: `action`, id: `kill-current-entity`, diff --git a/packages/agents-server-ui/src/components/SidebarRow.tsx b/packages/agents-server-ui/src/components/SidebarRow.tsx index 6f1598443e..8d40beaca3 100644 --- a/packages/agents-server-ui/src/components/SidebarRow.tsx +++ b/packages/agents-server-ui/src/components/SidebarRow.tsx @@ -103,7 +103,7 @@ export const SidebarRow = memo(function SidebarRow({ hoverHandle, }: SidebarRowProps): React.ReactElement { const { title } = getEntityDisplayTitle(entity) - const isStopped = entity.status === `stopped` + const isStopped = entity.status === `stopped` || entity.status === `killed` const hasChildren = childCount > 0 const sessionId = entity.url.replace(/^\//, ``) const className = [ diff --git a/packages/agents-server-ui/src/components/StatusDot.tsx b/packages/agents-server-ui/src/components/StatusDot.tsx index 8acb85f05f..879366884b 100644 --- a/packages/agents-server-ui/src/components/StatusDot.tsx +++ b/packages/agents-server-ui/src/components/StatusDot.tsx @@ -5,7 +5,10 @@ const STATUS_COLORS: Record = { running: `var(--ds-blue-9)`, idle: `var(--ds-green-9)`, spawning: `var(--ds-amber-9)`, + paused: `var(--ds-amber-9)`, + stopping: `var(--ds-amber-9)`, stopped: `var(--ds-gray-8)`, + killed: `var(--ds-red-9)`, } export function StatusDot({ diff --git a/packages/agents-server-ui/src/components/UserMessage.module.css b/packages/agents-server-ui/src/components/UserMessage.module.css index 9431319d37..35aab2141d 100644 --- a/packages/agents-server-ui/src/components/UserMessage.module.css +++ b/packages/agents-server-ui/src/components/UserMessage.module.css @@ -18,6 +18,63 @@ box-shadow: 0 1px 3px rgba(15, 15, 30, 0.04), 0 1px 1px rgba(15, 15, 30, 0.02); + position: relative; +} + +.bubble.withStop { + padding-right: 44px; +} + +.stopButton { + all: unset; + position: absolute; + top: 8px; + right: 8px; + display: inline-flex; + align-items: center; + justify-content: center; + width: 24px; + height: 24px; + border-radius: var(--ds-radius-full); + background: var(--ds-accent-9); + color: var(--ds-text-on-accent); + cursor: pointer; + box-shadow: + 0 1px 3px rgba(15, 15, 30, 0.08), + 0 1px 1px rgba(15, 15, 30, 0.04); + transition: background 0.12s ease; +} + +.stopButton:hover { + background: var(--ds-accent-10); +} + +.stopButton:disabled { + cursor: progress; +} + +.stopPending { + opacity: 0.72; + animation: stopPendingPulse 1s ease-in-out infinite; +} + +.stopPending:hover { + background: var(--ds-accent-9); +} + +.stopButton:focus-visible { + outline: 2px solid var(--ds-accent-a6); + outline-offset: 2px; +} + +@keyframes stopPendingPulse { + 0%, + 100% { + transform: scale(1); + } + 50% { + transform: scale(0.92); + } } .body { diff --git a/packages/agents-server-ui/src/components/UserMessage.tsx b/packages/agents-server-ui/src/components/UserMessage.tsx index 3113ddd26c..e7ce15e59a 100644 --- a/packages/agents-server-ui/src/components/UserMessage.tsx +++ b/packages/agents-server-ui/src/components/UserMessage.tsx @@ -1,6 +1,7 @@ import { memo } from 'react' +import { Square } from 'lucide-react' import type { EntityTimelineSection } from '@electric-ax/agents-runtime/client' -import { Stack, Text } from '../ui' +import { Icon, Stack, Text } from '../ui' import { TimeText } from './TimeText' import styles from './UserMessage.module.css' @@ -11,14 +12,42 @@ type UserMessageSection = Extract< export const UserMessage = memo(function UserMessage({ section, + showStop = false, + stopPending = false, + onStop, }: { section: UserMessageSection + showStop?: boolean + stopPending?: boolean + onStop?: () => void }): React.ReactElement { const sender = formatSender(section.from) return ( - + + {showStop && onStop && ( + + )} {section.text} diff --git a/packages/agents-server-ui/src/components/views/ChatView.tsx b/packages/agents-server-ui/src/components/views/ChatView.tsx index 3dcaee909a..c8bdb6bea1 100644 --- a/packages/agents-server-ui/src/components/views/ChatView.tsx +++ b/packages/agents-server-ui/src/components/views/ChatView.tsx @@ -5,6 +5,7 @@ import { EntityTimeline } from '../EntityTimeline' import { MessageInput } from '../MessageInput' import { EntityContextDrawer } from '../EntityContextDrawer' import { readTextPayload } from '../../lib/sendMessage' +import { useElectricAgents } from '../../lib/ElectricAgentsProvider' import type { ViewProps } from '../../lib/workspace/viewRegistry' import type { TimelineEntry } from '../../lib/timelineEntries' import type { OptimisticInboxMessage } from '../../lib/sendMessage' @@ -68,8 +69,10 @@ function GenericChatBody({ loading, error, } = useEntityTimeline(baseUrl || null, entityUrl) + const { signalEntity } = useElectricAgents() const navigate = useNavigate() const [sentMessageSignal, setSentMessageSignal] = useState(0) + const [stopPending, setStopPending] = useState(false) const [inlineQueuedMessages, setInlineQueuedMessages] = useState< Map >(() => new Map()) @@ -186,6 +189,29 @@ function GenericChatBody({ } }, [error, navigate, isSpawning]) + useEffect(() => { + if (!generationActive) { + setStopPending(false) + } + }, [generationActive]) + + useEffect(() => { + setStopPending(false) + }, [entityUrl]) + + const stopGeneration = useCallback(() => { + if (!entityUrl || !signalEntity || !generationActive || stopPending) return + setStopPending(true) + const tx = signalEntity({ + entityUrl, + signal: `SIGINT`, + reason: `Stopped from chat UI`, + }) + tx.isPersisted.promise.catch(() => { + setStopPending(false) + }) + }, [entityUrl, generationActive, signalEntity, stopPending]) + return ( <> )} onSend={() => setSentMessageSignal((value) => value + 1)} + onStop={stopGeneration} /> ) diff --git a/packages/agents-server-ui/src/components/workspace/SplitMenu.module.css b/packages/agents-server-ui/src/components/workspace/SplitMenu.module.css index a8556b6743..d55ab5a689 100644 --- a/packages/agents-server-ui/src/components/workspace/SplitMenu.module.css +++ b/packages/agents-server-ui/src/components/workspace/SplitMenu.module.css @@ -16,6 +16,86 @@ user-select: none; } +.submenuTrigger { + display: flex; + align-items: center; + gap: 6px; + min-height: var(--ds-row-height-md); + padding: 3px 3px 3px 8px; + border-radius: var(--ds-radius-item); + font-size: var(--ds-text-sm); + line-height: 1.3; + font-family: var(--ds-font-body); + color: var(--ds-text-1); + cursor: pointer; + outline: none; + user-select: none; + text-align: start; + width: 100%; + box-sizing: border-box; +} + +.submenuTrigger[data-highlighted] { + background: var(--ds-bg-hover); +} + +.submenuChevron { + margin-left: auto; + margin-right: 5px; + color: var(--ds-text-3); +} + +.signalRow { + display: grid; + grid-template-columns: minmax(0, 1fr) max-content; + grid-template-rows: auto auto; + column-gap: 18px; + row-gap: 1px; + align-items: baseline; + width: 100%; + margin-right: 5px; +} + +.signalName { + grid-column: 1; + grid-row: 1; + min-width: 0; + color: var(--ds-text-1); + font-size: var(--ds-text-sm); + line-height: 1.3; +} + +.signalDescription { + grid-column: 1; + grid-row: 2; + min-width: 0; + color: var(--ds-text-3); + font-size: var(--ds-font-size-1, 12px); + line-height: 1.3; +} + +.signalShortcut { + grid-column: 2; + grid-row: 1; + justify-self: end; + color: var(--ds-gray-11); + font-size: var(--ds-text-sm); + line-height: 1.3; +} + +.signalCode { + grid-column: 2; + grid-row: 2; + justify-self: end; + transform: translateY(-0.5px); + color: var(--ds-text-3); + font-family: var(--ds-font-mono); + font-size: var(--ds-font-size-1, 11px); + line-height: 1.3; + letter-spacing: 0.04em; + white-space: nowrap; +} + /* ---- ViewRow ----------------------------------------------------- */ /* Layout inside the Menu.Item: * [icon] Label [✓?] (spacer) [→][↓] diff --git a/packages/agents-server-ui/src/components/workspace/SplitMenu.tsx b/packages/agents-server-ui/src/components/workspace/SplitMenu.tsx index 2a90c93954..86ce447824 100644 --- a/packages/agents-server-ui/src/components/workspace/SplitMenu.tsx +++ b/packages/agents-server-ui/src/components/workspace/SplitMenu.tsx @@ -1,15 +1,17 @@ -import { useState } from 'react' +import { Fragment, useState } from 'react' import { + ChevronRight, Copy, Eye, GitFork, Link2, MoreHorizontal, + OctagonX, Pin, PinOff, + Radio, SplitSquareHorizontal, SplitSquareVertical, - Trash2, X, } from 'lucide-react' import { useNavigate } from '@tanstack/react-router' @@ -31,10 +33,90 @@ import { } from '../../ui' import { modKeyLabel } from '../../lib/keyLabels' import { getEntityDisplayTitle } from '../../lib/entityDisplay' -import type { ElectricEntity } from '../../lib/ElectricAgentsProvider' +import type { + ElectricEntity, + EntitySignal, +} from '../../lib/ElectricAgentsProvider' import type { Tile } from '../../lib/workspace/types' import styles from './SplitMenu.module.css' +const SIGNAL_OPTION_GROUPS: ReadonlyArray< + ReadonlyArray<{ + id: string + shortName: string + description: string + signal?: EntitySignal + composite?: `stop-immediately` + shortcut?: string + code: string + }> +> = [ + [ + { + id: `interrupt`, + signal: `SIGINT`, + shortName: `Interrupt`, + description: `Abort active run and continue`, + shortcut: `Esc`, + code: `SIGINT`, + }, + { + id: `stop-immediately`, + composite: `stop-immediately`, + shortName: `Stop immediately`, + description: `Abort active run and pause`, + shortcut: `Ctrl+C`, + code: `SIGSTOP+SIGINT`, + }, + ], + [ + { + id: `stop`, + signal: `SIGSTOP`, + shortName: `Stop`, + description: `Pause after current run`, + code: `SIGSTOP`, + }, + { + id: `reload`, + signal: `SIGHUP`, + shortName: `Reload`, + description: `Reload after current run`, + code: `SIGHUP`, + }, + { + id: `resume`, + signal: `SIGCONT`, + shortName: `Resume`, + description: `Resume paused work`, + code: `SIGCONT`, + }, + { + id: `custom`, + signal: `SIGUSR`, + shortName: `Custom`, + description: `Deliver to signal handler`, + code: `SIGUSR`, + }, + ], + [ + { + id: `terminate`, + signal: `SIGTERM`, + shortName: `Terminate`, + description: `Gracefully stop permanently`, + code: `SIGTERM`, + }, + { + id: `kill`, + signal: `SIGKILL`, + shortName: `Kill`, + description: `Immediately kill permanently`, + code: `SIGKILL`, + }, + ], +] + /** * Per-tile workspace menu. Shown in the tile header (the `…` button) * and contains, in order: @@ -71,11 +153,13 @@ export function SplitMenu({ entity: ElectricEntity | null }): React.ReactElement { const { workspace, helpers } = useWorkspace() - const { forkEntity, killEntity } = useElectricAgents() + const { forkEntity, killEntity, signalEntity } = useElectricAgents() const { pinnedUrls, togglePin } = usePinnedEntities() const navigate = useNavigate() const hasEntity = entity !== null && tile.entityUrl !== null const entityUrl = tile.entityUrl + const entityTerminal = + entity?.status === `stopped` || entity?.status === `killed` const pinned = entityUrl !== null && pinnedUrls.includes(entityUrl) // Hide "Close tile" when this is the only tile in the workspace — // closing it would leave the workspace empty (which the URL ↔ @@ -118,6 +202,47 @@ export function SplitMenu({ tx.isPersisted.promise.catch(() => {}) } + const handleStopImmediately = () => { + if (!signalEntity || entityUrl === null) return + const stopTx = signalEntity({ + entityUrl, + signal: `SIGSTOP`, + reason: `Stopped immediately from tile menu`, + }) + stopTx.isPersisted.promise + .then(() => { + const interruptTx = signalEntity({ + entityUrl, + signal: `SIGINT`, + reason: `Interrupted current run for immediate stop`, + }) + interruptTx.isPersisted.promise.catch(() => {}) + }) + .catch(() => {}) + } + + const handleSignal = (signal: EntitySignal) => { + if (!signalEntity || entityUrl === null) return + const tx = signalEntity({ + entityUrl, + signal, + reason: `Sent from tile menu`, + }) + tx.isPersisted.promise.catch(() => {}) + } + + const handleSignalOption = ( + option: (typeof SIGNAL_OPTION_GROUPS)[number][number] + ) => { + if (option.composite === `stop-immediately`) { + handleStopImmediately() + return + } + if (option.signal) { + handleSignal(option.signal) + } + } + const handleCopyLayoutLink = () => { // Encode the workspace into the DSL and append it as `?layout=…` // to the current URL. The receiving window's picks it @@ -233,14 +358,54 @@ export function SplitMenu({ )} {hasEntity && entity && forkEntity && !entity.parent && ( - + Fork subtree )} + {hasEntity && entity && !entityTerminal && signalEntity && ( + + + + Send signal + + + + {SIGNAL_OPTION_GROUPS.map((group, groupIndex) => ( + + {groupIndex > 0 && } + {group.map((option) => ( + handleSignalOption(option)} + > + + + {option.shortName} + + {option.shortcut && ( + + {option.shortcut} + + )} + + {option.description} + + + {option.code} + + + + ))} + + ))} + + + )} {!isOnlyTile && ( <> @@ -253,14 +418,11 @@ export function SplitMenu({ )} - {hasEntity && entity && entity.status !== `stopped` && killEntity && ( + {hasEntity && entity && !entityTerminal && killEntity && ( <> - setShowKillConfirm(true)} - tone="danger" - > - + setShowKillConfirm(true)}> + Kill entity diff --git a/packages/agents-server-ui/src/components/workspace/TileContainer.tsx b/packages/agents-server-ui/src/components/workspace/TileContainer.tsx index a7a27fed14..00874c4f34 100644 --- a/packages/agents-server-ui/src/components/workspace/TileContainer.tsx +++ b/packages/agents-server-ui/src/components/workspace/TileContainer.tsx @@ -49,7 +49,12 @@ export function TileContainer({ }, [isActive, tile.id, helpers]) return ( -
+
{tile.entityUrl !== null ? ( helpers.setTileView(tile.id, viewId), diff --git a/packages/agents-server-ui/src/hooks/useHotkey.ts b/packages/agents-server-ui/src/hooks/useHotkey.ts index 652928d6fa..c1fe16a62b 100644 --- a/packages/agents-server-ui/src/hooks/useHotkey.ts +++ b/packages/agents-server-ui/src/hooks/useHotkey.ts @@ -5,6 +5,8 @@ type HotkeyOptions = { ignoreInputs?: boolean /** Disable the hook (handler is not registered). */ disabled?: boolean + /** Listen during capture so focused controls cannot consume the shortcut first. */ + capture?: boolean } const isMac = (): boolean => @@ -38,7 +40,7 @@ export function useHotkey( handler: (e: KeyboardEvent) => void, options: HotkeyOptions = {} ): void { - const { ignoreInputs = true, disabled = false } = options + const { ignoreInputs = true, disabled = false, capture = false } = options useEffect(() => { if (disabled) return @@ -64,9 +66,9 @@ export function useHotkey( handler(e) } - window.addEventListener(`keydown`, onKeyDown) - return () => window.removeEventListener(`keydown`, onKeyDown) - }, [combo, handler, ignoreInputs, disabled]) + window.addEventListener(`keydown`, onKeyDown, { capture }) + return () => window.removeEventListener(`keydown`, onKeyDown, { capture }) + }, [combo, handler, ignoreInputs, disabled, capture]) } export const isMacPlatform = isMac diff --git a/packages/agents-server-ui/src/lib/ElectricAgentsProvider.tsx b/packages/agents-server-ui/src/lib/ElectricAgentsProvider.tsx index d2a2bc59f3..8c08f65a6c 100644 --- a/packages/agents-server-ui/src/lib/ElectricAgentsProvider.tsx +++ b/packages/agents-server-ui/src/lib/ElectricAgentsProvider.tsx @@ -8,7 +8,23 @@ import type { ReactNode } from 'react' import { serverFetch } from './auth-fetch' import { entityApiUrl, entitySpawnApiUrl } from './entity-api' -type EntityStatus = `spawning` | `running` | `idle` | `stopped` +type EntityStatus = + | `spawning` + | `running` + | `idle` + | `paused` + | `stopping` + | `stopped` + | `killed` + +export type EntitySignal = + | `SIGINT` + | `SIGHUP` + | `SIGTERM` + | `SIGKILL` + | `SIGSTOP` + | `SIGCONT` + | `SIGUSR` // --- Schemas --- @@ -16,7 +32,10 @@ const ENTITY_STATUSES: [EntityStatus, ...Array] = [ `spawning`, `running`, `idle`, + `paused`, + `stopping`, `stopped`, + `killed`, ] const entitySchema = z.object({ @@ -251,6 +270,13 @@ interface SpawnInput { dispatch_policy?: RunnerDispatchPolicy } +export interface SignalInput { + entityUrl: string + signal: EntitySignal + reason?: string + payload?: unknown +} + function createSpawnAction( baseUrl: string, entitiesCollection: EntitiesCollection @@ -321,13 +347,21 @@ function createKillAction( return createOptimisticAction({ onMutate: (entityUrl) => { entitiesCollection.update(entityUrl, (draft) => { - draft.status = `stopped` + draft.status = `killed` }) }, mutationFn: async (entityUrl) => { - const res = await serverFetch(entityApiUrl(baseUrl, entityUrl), { - method: `DELETE`, - }) + const res = await serverFetch( + entityApiUrl(baseUrl, entityUrl, `/signal`), + { + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify({ + signal: `SIGKILL`, + reason: `Killed from agents UI`, + }), + } + ) if (!res.ok) { const text = await res.text().catch(() => ``) throw new Error(text || `Kill failed (${res.status})`) @@ -338,6 +372,59 @@ function createKillAction( }) } +function optimisticStatusForSignal(signal: EntitySignal): EntityStatus | null { + switch (signal) { + case `SIGKILL`: + return `killed` + case `SIGINT`: + case `SIGTERM`: + return `stopping` + case `SIGSTOP`: + return `paused` + case `SIGCONT`: + return `idle` + case `SIGHUP`: + case `SIGUSR`: + return null + } +} + +function createSignalAction( + baseUrl: string, + entitiesCollection: EntitiesCollection +) { + return createOptimisticAction({ + onMutate: ({ entityUrl, signal }) => { + const optimisticStatus = optimisticStatusForSignal(signal) + if (!optimisticStatus) return + + entitiesCollection.update(entityUrl, (draft) => { + draft.status = optimisticStatus + }) + }, + mutationFn: async ({ entityUrl, signal, reason, payload }) => { + const body: Record = { signal } + if (reason !== undefined) body.reason = reason + if (payload !== undefined) body.payload = payload + + const res = await serverFetch( + entityApiUrl(baseUrl, entityUrl, `/signal`), + { + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify(body), + } + ) + if (!res.ok) { + const text = await res.text().catch(() => ``) + throw new Error(text || `Signal failed (${res.status})`) + } + const data = (await res.json()) as { txid: number } + return { txid: data.txid } + }, + }) +} + function createForkEntity(baseUrl: string) { return async (entityUrl: string): Promise<{ url: string }> => { const res = await serverFetch(entityApiUrl(baseUrl, entityUrl, `/fork`), { @@ -374,6 +461,7 @@ interface ElectricAgentsState { entityTypesCollection: EntityTypesCollection | null runnersCollection: RunnersCollection | null spawnEntity: ReturnType | null + signalEntity: ReturnType | null killEntity: ReturnType | null forkEntity: ReturnType | null } @@ -383,6 +471,7 @@ const ElectricAgentsContext = createContext({ entityTypesCollection: null, runnersCollection: null, spawnEntity: null, + signalEntity: null, killEntity: null, forkEntity: null, }) @@ -412,6 +501,7 @@ export function ElectricAgentsProvider({ entityTypesCollection: null, runnersCollection: null, spawnEntity: null, + signalEntity: null, killEntity: null, forkEntity: null, } @@ -424,6 +514,7 @@ export function ElectricAgentsProvider({ entityTypesCollection: entityTypes, runnersCollection: runners, spawnEntity: createSpawnAction(baseUrl, entities), + signalEntity: createSignalAction(baseUrl, entities), killEntity: createKillAction(baseUrl, entities), forkEntity: createForkEntity(baseUrl), } diff --git a/packages/agents-server-ui/src/lib/sessionGroups.ts b/packages/agents-server-ui/src/lib/sessionGroups.ts index 78f8d393d0..78d36277be 100644 --- a/packages/agents-server-ui/src/lib/sessionGroups.ts +++ b/packages/agents-server-ui/src/lib/sessionGroups.ts @@ -160,15 +160,18 @@ export function groupByType( } /** - * Group by `status`, ordered by lifecycle (running → idle → spawning - * → stopped) so the user's eye lands on currently-active sessions + * Group by `status`, ordered by lifecycle so the user's eye lands on + * currently-active sessions * first. Same in-group sort as `groupByType`. */ const STATUS_ORDER: Record = { running: 0, idle: 1, - spawning: 2, - stopped: 3, + paused: 2, + spawning: 3, + stopping: 4, + stopped: 5, + killed: 6, } export function groupByStatus( diff --git a/packages/agents-server-ui/src/lib/types.ts b/packages/agents-server-ui/src/lib/types.ts index f08985b6a3..e1ff97e046 100644 --- a/packages/agents-server-ui/src/lib/types.ts +++ b/packages/agents-server-ui/src/lib/types.ts @@ -21,7 +21,14 @@ export interface ServerConfig { tenantId?: string } -export type PublicEntityStatus = `spawning` | `running` | `idle` | `stopped` +export type PublicEntityStatus = + | `spawning` + | `running` + | `idle` + | `paused` + | `stopping` + | `stopped` + | `killed` export interface PublicEntity { url: string diff --git a/packages/agents-server-ui/src/router.tsx b/packages/agents-server-ui/src/router.tsx index 8030d23fb4..b08f5965f2 100644 --- a/packages/agents-server-ui/src/router.tsx +++ b/packages/agents-server-ui/src/router.tsx @@ -10,9 +10,15 @@ import { useNavigate, useParams, } from '@tanstack/react-router' +import { useLiveQuery } from '@tanstack/react-db' +import { eq } from '@tanstack/db' import { z } from 'zod' import { getActiveBaseUrl, preloadEntityStream } from './lib/entity-connection' -import { preloadAppCollections } from './lib/ElectricAgentsProvider' +import { + preloadAppCollections, + useElectricAgents, + type EntitySignal, +} from './lib/ElectricAgentsProvider' import { usePinnedEntities } from './hooks/usePinnedEntities' import { SidebarCollapsedProvider, @@ -85,10 +91,23 @@ function RootShell(): React.ReactElement { } = useSidebarCollapsed() const search = useSearchPalette() const { workspace, helpers } = useWorkspace() + const { entitiesCollection, signalEntity } = useElectricAgents() const { openFindForTile, findNextInTile, findPreviousInTile } = usePaneFindCommands() const nativeMenuHandlesAppHotkeys = typeof window !== `undefined` && Boolean(window.electronAPI) + const activeEntityUrl = helpers.activeTile?.entityUrl ?? null + const { data: activeEntityMatches = [] } = useLiveQuery( + (q) => { + if (!entitiesCollection || !activeEntityUrl) return undefined + return q + .from({ entity: entitiesCollection }) + .where(({ entity }) => eq(entity.url, activeEntityUrl)) + }, + [entitiesCollection, activeEntityUrl] + ) + const keyboardSignalEntityIsRunning = + activeEntityMatches.at(0)?.status === `running` useHotkey(`mod+b`, toggle, { disabled: nativeMenuHandlesAppHotkeys }) useHotkey( @@ -150,6 +169,55 @@ function RootShell(): React.ReactElement { openNewSession() }) + const signalActiveTile = useCallback( + (signal: EntitySignal, reason: string) => { + if (!keyboardSignalEntityIsRunning) return false + const entityUrl = activeEntityUrl + if (!entityUrl || !signalEntity) return false + const tx = signalEntity({ entityUrl, signal, reason }) + tx.isPersisted.promise.catch(() => {}) + return true + }, + [activeEntityUrl, keyboardSignalEntityIsRunning, signalEntity] + ) + const stopActiveTile = useCallback(() => { + if (!keyboardSignalEntityIsRunning) return false + const entityUrl = activeEntityUrl + if (!entityUrl || !signalEntity) return false + const stopTx = signalEntity({ + entityUrl, + signal: `SIGSTOP`, + reason: `Stopped immediately from keyboard`, + }) + stopTx.isPersisted.promise + .then(() => { + const interruptTx = signalEntity({ + entityUrl, + signal: `SIGINT`, + reason: `Interrupted current run for immediate stop`, + }) + interruptTx.isPersisted.promise.catch(() => {}) + }) + .catch(() => {}) + return true + }, [activeEntityUrl, keyboardSignalEntityIsRunning, signalEntity]) + useHotkey( + `escape`, + (e) => { + if (!signalActiveTile(`SIGINT`, `Interrupted from keyboard`)) return + e.preventDefault() + }, + { ignoreInputs: false, capture: true } + ) + useHotkey( + `ctrl+c`, + (e) => { + if (!stopActiveTile()) return + e.preventDefault() + }, + { ignoreInputs: false, capture: true } + ) + useWorkspaceHotkeys({ disabled: nativeMenuHandlesAppHotkeys }) useWorkspacePersistence() useDocumentTitle() diff --git a/packages/agents-server/drizzle/0007_entity_signal_statuses.sql b/packages/agents-server/drizzle/0007_entity_signal_statuses.sql new file mode 100644 index 0000000000..3082edade8 --- /dev/null +++ b/packages/agents-server/drizzle/0007_entity_signal_statuses.sql @@ -0,0 +1,3 @@ +ALTER TABLE "entities" DROP CONSTRAINT "chk_entities_status"; +--> statement-breakpoint +ALTER TABLE "entities" ADD CONSTRAINT "chk_entities_status" CHECK ("entities"."status" IN ('spawning', 'running', 'idle', 'paused', 'stopping', 'stopped', 'killed')); diff --git a/packages/agents-server/drizzle/0009_entity_signal_statuses.sql b/packages/agents-server/drizzle/0009_entity_signal_statuses.sql new file mode 100644 index 0000000000..3082edade8 --- /dev/null +++ b/packages/agents-server/drizzle/0009_entity_signal_statuses.sql @@ -0,0 +1,3 @@ +ALTER TABLE "entities" DROP CONSTRAINT "chk_entities_status"; +--> statement-breakpoint +ALTER TABLE "entities" ADD CONSTRAINT "chk_entities_status" CHECK ("entities"."status" IN ('spawning', 'running', 'idle', 'paused', 'stopping', 'stopped', 'killed')); diff --git a/packages/agents-server/drizzle/meta/_journal.json b/packages/agents-server/drizzle/meta/_journal.json index 4574ed99be..994aceeb3d 100644 --- a/packages/agents-server/drizzle/meta/_journal.json +++ b/packages/agents-server/drizzle/meta/_journal.json @@ -64,6 +64,13 @@ "when": 1778976000000, "tag": "0008_runner_runtime_diagnostics", "breakpoints": true + }, + { + "idx": 9, + "version": "7", + "when": 1778540000000, + "tag": "0009_entity_signal_statuses", + "breakpoints": true } ] } diff --git a/packages/agents-server/src/db/schema.ts b/packages/agents-server/src/db/schema.ts index 15a1d234bc..2cc0994de2 100644 --- a/packages/agents-server/src/db/schema.ts +++ b/packages/agents-server/src/db/schema.ts @@ -66,7 +66,7 @@ export const entities = pgTable( index(`entities_tags_index_gin`).using(`gin`, table.tagsIndex), check( `chk_entities_status`, - sql`${table.status} IN ('spawning', 'running', 'idle', 'stopped')` + sql`${table.status} IN ('spawning', 'running', 'idle', 'paused', 'stopping', 'stopped', 'killed')` ), ] ) diff --git a/packages/agents-server/src/electric-agents-types.ts b/packages/agents-server/src/electric-agents-types.ts index 26482350ed..3dc88d2f4a 100644 --- a/packages/agents-server/src/electric-agents-types.ts +++ b/packages/agents-server/src/electric-agents-types.ts @@ -15,13 +15,42 @@ export type AuthenticateRequest = ( request: Request ) => Promise | Principal | null -export type EntityStatus = `spawning` | `running` | `idle` | `stopped` +export type EntityStatus = + | `spawning` + | `running` + | `idle` + | `paused` + | `stopping` + | `stopped` + | `killed` + +export type EntitySignal = + | `SIGINT` + | `SIGHUP` + | `SIGTERM` + | `SIGKILL` + | `SIGSTOP` + | `SIGCONT` + | `SIGUSR` const VALID_ENTITY_STATUSES = new Set([ `spawning`, `running`, `idle`, + `paused`, + `stopping`, `stopped`, + `killed`, +]) + +const VALID_ENTITY_SIGNALS = new Set([ + `SIGINT`, + `SIGHUP`, + `SIGTERM`, + `SIGKILL`, + `SIGSTOP`, + `SIGCONT`, + `SIGUSR`, ]) export function assertEntityStatus(s: string): EntityStatus { @@ -236,6 +265,41 @@ export interface ConsumerClaim { updated_at: string } +export function assertEntitySignal(s: string): EntitySignal { + if (!VALID_ENTITY_SIGNALS.has(s)) { + throw new Error(`Invalid entity signal: "${s}"`) + } + return s as EntitySignal +} + +export function isTerminalEntityStatus(status: EntityStatus): boolean { + return status === `stopped` || status === `killed` +} + +export function rejectsNormalWrites(status: EntityStatus): boolean { + return status === `stopping` || isTerminalEntityStatus(status) +} + +export function expectedSignalStatus( + status: EntityStatus, + signal: EntitySignal +): EntityStatus { + switch (signal) { + case `SIGKILL`: + return `killed` + case `SIGTERM`: + return status === `idle` ? `stopped` : `stopping` + case `SIGSTOP`: + return status === `idle` ? `paused` : status + case `SIGCONT`: + return status === `paused` ? `idle` : status + case `SIGINT`: + case `SIGHUP`: + case `SIGUSR`: + return status + } +} + export interface ElectricAgentsEntity { url: string type: string @@ -353,6 +417,21 @@ export interface SendRequest { position?: string } +export interface SignalRequest { + signal: EntitySignal + reason?: string + payload?: unknown +} + +export interface SignalResponse { + url: string + signal: EntitySignal + previous_state: EntityStatus + new_state: EntityStatus + created_at: number + txid: number +} + export interface SetTagRequest { value: string } @@ -369,6 +448,7 @@ export const ErrCodeNoSubscription = `NO_SUBSCRIPTION` export const ErrCodeNotFound = `NOT_FOUND` export const ErrCodeNotRunning = `NOT_RUNNING` export const ErrCodeInvalidRequest = `INVALID_REQUEST` +export const ErrCodeInvalidSignal = `INVALID_SIGNAL` export const ErrCodeUnknownEntityType = `UNKNOWN_ENTITY_TYPE` export const ErrCodeSchemaValidationFailed = `SCHEMA_VALIDATION_FAILED` export const ErrCodeUnknownMessageType = `UNKNOWN_MESSAGE_TYPE` diff --git a/packages/agents-server/src/entity-manager.ts b/packages/agents-server/src/entity-manager.ts index 3c7bcd246d..3858e080e8 100644 --- a/packages/agents-server/src/entity-manager.ts +++ b/packages/agents-server/src/entity-manager.ts @@ -16,6 +16,7 @@ import { ErrCodeEntityPersistFailed, ErrCodeForkInProgress, ErrCodeForkWaitTimeout, + ErrCodeInvalidSignal, ErrCodeInvalidRequest, ErrCodeNotFound, ErrCodeNotRunning, @@ -25,6 +26,8 @@ import { ErrCodeUnknownEntityType, ErrCodeUnknownEventType, ErrCodeUnknownMessageType, + isTerminalEntityStatus, + rejectsNormalWrites, } from './electric-agents-types.js' import { parseDispatchPolicy } from './dispatch-policy-schema.js' import { applyTypeDefaultSubscriptionScope } from './routing/dispatch-policy.js' @@ -54,9 +57,12 @@ import type { DispatchPolicy, ElectricAgentsEntity, ElectricAgentsEntityType, + EntitySignal, RegisterEntityTypeRequest, SendRequest, SetTagRequest, + SignalRequest, + SignalResponse, TypedSpawnRequest, } from './electric-agents-types.js' import type { EntityBridgeCoordinator } from './entity-bridge-manager.js' @@ -103,6 +109,8 @@ type ForkResult = { const DEFAULT_FORK_WAIT_TIMEOUT_MS = 120_000 const DEFAULT_FORK_WAIT_POLL_MS = 250 +const SERVER_SIGNAL_SENDER = `/_electric/server` + function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)) } @@ -928,16 +936,20 @@ export class EntityManager { } const subtree = await this.listEntitySubtree(root) - const stopped = subtree.find((entity) => entity.status === `stopped`) + const stopped = subtree.find((entity) => + isTerminalEntityStatus(entity.status) + ) if (stopped) { throw new ElectricAgentsError( ErrCodeNotRunning, - `Cannot fork stopped entity "${stopped.url}"`, + `Cannot fork terminal entity "${stopped.url}"`, 409 ) } - let active = subtree.filter((entity) => entity.status !== `idle`) + let active = subtree.filter( + (entity) => entity.status !== `idle` && entity.status !== `paused` + ) if (active.length === 0) { this.addForkLocks( this.forkWorkLockedEntities, @@ -959,7 +971,7 @@ export class EntityManager { workLocks ) const lockedActive = lockedSubtree.filter( - (entity) => entity.status !== `idle` + (entity) => entity.status !== `idle` && entity.status !== `paused` ) if (lockedActive.length === 0) { return lockedSubtree @@ -1673,6 +1685,12 @@ export class EntityManager { value.processed_at = now } + const wakePausedEntity = entity.status === `paused` && req.mode !== `paused` + if (wakePausedEntity) { + await this.registry.updateStatus(entityUrl, `idle`) + await this.entityBridgeManager?.onEntityChanged(entityUrl) + } + const envelope = entityStateSchema.inbox.insert({ key, value, @@ -1725,8 +1743,12 @@ export class EntityManager { if (!entity) { throw new ElectricAgentsError(ErrCodeNotFound, `Entity not found`, 404) } - if (entity.status === `stopped`) { - throw new ElectricAgentsError(ErrCodeNotRunning, `Entity is stopped`, 409) + if (rejectsNormalWrites(entity.status)) { + throw new ElectricAgentsError( + ErrCodeNotRunning, + `Entity is not accepting writes`, + 409 + ) } const now = new Date().toISOString() @@ -1763,8 +1785,12 @@ export class EntityManager { if (!entity) { throw new ElectricAgentsError(ErrCodeNotFound, `Entity not found`, 404) } - if (entity.status === `stopped`) { - throw new ElectricAgentsError(ErrCodeNotRunning, `Entity is stopped`, 409) + if (rejectsNormalWrites(entity.status)) { + throw new ElectricAgentsError( + ErrCodeNotRunning, + `Entity is not accepting writes`, + 409 + ) } const envelope = entityStateSchema.inbox.delete({ key } as any) @@ -1796,8 +1822,12 @@ export class EntityManager { 401 ) } - if (entity.status === `stopped`) { - throw new ElectricAgentsError(ErrCodeNotRunning, `Entity is stopped`, 409) + if (rejectsNormalWrites(entity.status)) { + throw new ElectricAgentsError( + ErrCodeNotRunning, + `Entity is not accepting writes`, + 409 + ) } if (typeof req.value !== `string`) { @@ -1842,8 +1872,12 @@ export class EntityManager { 401 ) } - if (entity.status === `stopped`) { - throw new ElectricAgentsError(ErrCodeNotRunning, `Entity is stopped`, 409) + if (rejectsNormalWrites(entity.status)) { + throw new ElectricAgentsError( + ErrCodeNotRunning, + `Entity is not accepting writes`, + 409 + ) } const result = await this.registry.removeEntityTag(entityUrl, key) @@ -2354,37 +2388,177 @@ export class EntityManager { } // ========================================================================== - // Kill + // Signals // ========================================================================== - async kill(entityUrl: string): Promise<{ txid: number }> { + async signal(entityUrl: string, req: SignalRequest): Promise { const entity = await this.registry.getEntity(entityUrl) if (!entity) { throw new ElectricAgentsError(ErrCodeNotFound, `Entity not found`, 404) } - await this.wakeRegistry.unregisterBySubscriber(entityUrl, this.tenantId) - await this.wakeRegistry.unregisterBySource(entityUrl, this.tenantId) + if (isTerminalEntityStatus(entity.status)) { + throw new ElectricAgentsError( + ErrCodeInvalidSignal, + `Cannot signal a ${entity.status} entity`, + 409 + ) + } + + const now = new Date() + const previousState = entity.status + const handling = this.serverHandlingForSignal(previousState, req.signal) + const txid = + handling.status === previousState + ? await this.registry.touchEntityWithTxid(entityUrl) + : await this.registry.updateStatusWithTxid(entityUrl, handling.status) + + const key = `sig-${now.getTime()}-${randomUUID().slice(0, 8)}` + const signalValue: Record = { + signal: req.signal, + status: handling.handled ? `handled` : `unhandled`, + sender: SERVER_SIGNAL_SENDER, + timestamp: now.toISOString(), + } + if (req.reason !== undefined) signalValue.reason = req.reason + if (req.payload !== undefined) signalValue.payload = req.payload + if (handling.handled) { + signalValue.handled_at = now.toISOString() + signalValue.handled_by = SERVER_SIGNAL_SENDER + signalValue.outcome = handling.outcome + signalValue.previous_state = previousState + signalValue.new_state = handling.status + } + + const signalEvent = { + type: `signal`, + key, + value: signalValue, + headers: { + operation: `insert`, + timestamp: now.toISOString(), + txid: String(txid), + }, + } + + const shouldCloseStreams = isTerminalEntityStatus(handling.status) + await this.appendSignalEvent(entity, signalEvent, shouldCloseStreams) + if (!shouldCloseStreams) { + await this.evaluateWakes(entityUrl, signalEvent) + } + + if (handling.unregisterWakes) { + await this.wakeRegistry.unregisterBySubscriber(entityUrl, this.tenantId) + await this.wakeRegistry.unregisterBySource(entityUrl, this.tenantId) + } - const txid = await this.registry.updateStatusWithTxid(entityUrl, `stopped`) - if (this.entityBridgeManager) { + if (handling.status !== previousState && this.entityBridgeManager) { await this.entityBridgeManager.onEntityChanged(entityUrl) } - // Append entity_stopped to main/error streams and close them. - const stoppedEvent = entityStateSchema.entityStopped.insert({ - key: `stopped`, - value: { - timestamp: new Date().toISOString(), - }, - } as any) - const eofData = this.encodeChangeEvent( - stoppedEvent as Record - ) + return { + url: entityUrl, + signal: req.signal, + previous_state: previousState, + new_state: handling.status, + created_at: now.getTime(), + txid, + } + } - for (const streamPath of [entity.streams.main, entity.streams.error]) { + async kill(entityUrl: string): Promise<{ txid: number }> { + const response = await this.signal(entityUrl, { + signal: `SIGKILL`, + reason: `Legacy kill command`, + }) + return { txid: response.txid } + } + + private serverHandlingForSignal( + status: ElectricAgentsEntity[`status`], + signal: EntitySignal + ): { + status: ElectricAgentsEntity[`status`] + handled: boolean + outcome: `transitioned` | `ignored` + unregisterWakes: boolean + } { + if (signal === `SIGKILL`) { + return { + status: `killed`, + handled: true, + outcome: `transitioned`, + unregisterWakes: true, + } + } + if (signal === `SIGTERM`) { + if (status === `idle` || status === `paused`) { + return { + status: `stopped`, + handled: true, + outcome: `transitioned`, + unregisterWakes: true, + } + } + if (status === `running`) { + return { + status: `stopping`, + handled: false, + outcome: `transitioned`, + unregisterWakes: false, + } + } + } + if (signal === `SIGSTOP` && (status === `idle` || status === `running`)) { + return { + status: `paused`, + handled: status === `idle`, + outcome: `transitioned`, + unregisterWakes: false, + } + } + if (signal === `SIGCONT` && status === `paused`) { + return { + status: `idle`, + handled: false, + outcome: `transitioned`, + unregisterWakes: false, + } + } + + return { + status, + handled: false, + outcome: `ignored`, + unregisterWakes: false, + } + } + + private async appendSignalEvent( + entity: ElectricAgentsEntity, + signalEvent: Record, + closeStreams: boolean + ): Promise { + const signalData = this.encodeChangeEvent(signalEvent) + if (!closeStreams) { + await this.streamClient.append(entity.streams.main, signalData) + return + } + + const errorCloseEvent = { + type: `signal`, + key: signalEvent.key, + value: signalEvent.value, + headers: signalEvent.headers, + } + const errorSignalData = this.encodeChangeEvent(errorCloseEvent) + + for (const [streamPath, data] of [ + [entity.streams.main, signalData], + [entity.streams.error, errorSignalData], + ] as const) { try { - await this.streamClient.append(streamPath, eofData, { close: true }) + await this.streamClient.append(streamPath, data, { close: true }) } catch (err) { const message = err instanceof Error ? err.message : String(err) if ( @@ -2398,8 +2572,6 @@ export class EntityManager { throw err } } - - return { txid } } // ========================================================================== @@ -2627,8 +2799,12 @@ export class EntityManager { if (!entity) { throw new ElectricAgentsError(ErrCodeNotFound, `Entity not found`, 404) } - if (entity.status === `stopped`) { - throw new ElectricAgentsError(ErrCodeNotRunning, `Entity is stopped`, 409) + if (rejectsNormalWrites(entity.status)) { + throw new ElectricAgentsError( + ErrCodeNotRunning, + `Entity is not accepting writes`, + 409 + ) } if (req.type && entity.type) { diff --git a/packages/agents-server/src/entity-registry.ts b/packages/agents-server/src/entity-registry.ts index a0c5b23426..ef8c1c1718 100644 --- a/packages/agents-server/src/entity-registry.ts +++ b/packages/agents-server/src/entity-registry.ts @@ -15,6 +15,7 @@ import { assertEntityStatus, assertRunnerAdminStatus, assertRunnerKind, + isTerminalEntityStatus, } from './electric-agents-types.js' import { DEFAULT_TENANT_ID } from './tenant.js' import type { DrizzleDB } from './db/index.js' @@ -713,10 +714,13 @@ export class PostgresRegistry { } async updateStatus(entityUrl: string, status: EntityStatus): Promise { - const whereClause = - status === `stopped` - ? this.entityWhere(entityUrl) - : and(this.entityWhere(entityUrl), ne(entities.status, `stopped`)) + const whereClause = isTerminalEntityStatus(status) + ? this.entityWhere(entityUrl) + : and( + this.entityWhere(entityUrl), + ne(entities.status, `stopped`), + ne(entities.status, `killed`) + ) await this.db .update(entities) @@ -729,10 +733,13 @@ export class PostgresRegistry { status: EntityStatus ): Promise { return await this.db.transaction(async (tx) => { - const whereClause = - status === `stopped` - ? this.entityWhere(entityUrl) - : and(this.entityWhere(entityUrl), ne(entities.status, `stopped`)) + const whereClause = isTerminalEntityStatus(status) + ? this.entityWhere(entityUrl) + : and( + this.entityWhere(entityUrl), + ne(entities.status, `stopped`), + ne(entities.status, `killed`) + ) await tx .update(entities) @@ -745,6 +752,25 @@ export class PostgresRegistry { }) } + async touchEntityWithTxid(entityUrl: string): Promise { + return await this.db.transaction(async (tx) => { + await tx + .update(entities) + .set({ updatedAt: Date.now() }) + .where( + and( + eq(entities.url, entityUrl), + ne(entities.status, `stopped`), + ne(entities.status, `killed`) + ) + ) + const result = await tx.execute( + sql`SELECT pg_current_xact_id()::xid::text AS txid` + ) + return parseInt((result[0] as { txid: string }).txid) + }) + } + async setEntityTag( url: string, key: string, diff --git a/packages/agents-server/src/index.ts b/packages/agents-server/src/index.ts index 50227c92e2..4a04a9e672 100644 --- a/packages/agents-server/src/index.ts +++ b/packages/agents-server/src/index.ts @@ -15,6 +15,14 @@ export type { SubscriptionResponse, SubscriptionStreamInfo, } from './stream-client.js' +export { + assertEntitySignal, + assertEntityStatus, + expectedSignalStatus, + isTerminalEntityStatus, + rejectsNormalWrites, + toPublicEntity, +} from './electric-agents-types.js' export type { AuthenticateRequest, ConsumerClaim, @@ -32,6 +40,18 @@ export type { RunnerLiveness, SourceStreamOffset, WakeNotificationRow, + ElectricAgentsEntity, + ElectricAgentsEntityRow, + ElectricAgentsEntityType, + EntityStatus, + EntitySignal, + PublicElectricAgentsEntity, + EntityListFilter, + RegisterEntityTypeRequest, + SendRequest, + SignalRequest, + SignalResponse, + TypedSpawnRequest, } from './electric-agents-types.js' export type { Principal, PrincipalKind } from './principal.js' export { globalRouter } from './routing/global-router.js' diff --git a/packages/agents-server/src/routing/dispatch-policy.ts b/packages/agents-server/src/routing/dispatch-policy.ts index c3c0985ae8..930bc34908 100644 --- a/packages/agents-server/src/routing/dispatch-policy.ts +++ b/packages/agents-server/src/routing/dispatch-policy.ts @@ -217,6 +217,12 @@ export async function assertDispatchPolicyAllowed( } } +export function shouldLinkDispatchBeforeInitialMessage( + policy: DispatchPolicy | undefined +): boolean { + return policy?.targets[0] !== undefined +} + export async function linkEntityDispatchSubscription( ctx: TenantContext, entity: ElectricAgentsEntity diff --git a/packages/agents-server/src/routing/entities-router.ts b/packages/agents-server/src/routing/entities-router.ts index b6c8ef3141..11a83a8fb2 100644 --- a/packages/agents-server/src/routing/entities-router.ts +++ b/packages/agents-server/src/routing/entities-router.ts @@ -8,6 +8,8 @@ import { apiError } from '../electric-agents-http.js' import { parsePrincipalKey, principalUrl } from '../principal.js' import { dispatchPolicySchema } from '../dispatch-policy-schema.js' import { + assertEntitySignal, + ErrCodeInvalidSignal, ErrCodeNotFound, ErrCodeUnknownEntityType, ErrCodeInvalidRequest, @@ -18,6 +20,7 @@ import { backfillEntityDispatchPolicy, linkEntityDispatchSubscription, resolveEffectiveDispatchPolicyForSpawn, + shouldLinkDispatchBeforeInitialMessage, unlinkEntityDispatchSubscription, } from './dispatch-policy.js' import { routeBody, withSchema } from './schema.js' @@ -133,6 +136,12 @@ const setTagBodySchema = Type.Object({ value: Type.String(), }) +const signalBodySchema = Type.Object({ + signal: Type.String(), + reason: Type.Optional(Type.String()), + payload: Type.Optional(Type.Unknown()), +}) + const scheduleBodySchema = Type.Union([ Type.Object({ scheduleType: Type.Literal(`cron`), @@ -161,6 +170,7 @@ type SendBody = Static type InboxMessageBody = Static type ForkBody = Static type SetTagBody = Static +type SignalBody = Static type ScheduleBody = Static type EntitiesRegisterBody = Static @@ -187,6 +197,12 @@ entitiesRouter.put( entitiesRouter.get(`/:type/:instanceId`, withExistingEntity, getEntity) entitiesRouter.head(`/:type/:instanceId`, withExistingEntity, headEntity) entitiesRouter.delete(`/:type/:instanceId`, withExistingEntity, killEntity) +entitiesRouter.post( + `/:type/:instanceId/signal`, + withExistingEntity, + withSchema(signalBodySchema), + signalEntity +) entitiesRouter.post( `/:type/:instanceId/send`, withExistingEntity, @@ -614,13 +630,21 @@ async function spawnEntity( wake: parsed.wake, created_by: principal.url, }) - await linkEntityDispatchSubscription(ctx, entity) + const linkBeforeInitialMessage = + parsed.initialMessage !== undefined && + shouldLinkDispatchBeforeInitialMessage(dispatchPolicy) + if (linkBeforeInitialMessage) { + await linkEntityDispatchSubscription(ctx, entity) + } if (parsed.initialMessage !== undefined) { await ctx.entityManager.send(entity.url, { from: principal.url, payload: parsed.initialMessage, }) } + if (!linkBeforeInitialMessage) { + await linkEntityDispatchSubscription(ctx, entity) + } return json( { ...toPublicEntity(entity), txid: entity.txid }, @@ -655,3 +679,31 @@ async function killEntity( ctx.runtime.claimWriteTokens.clearStream(ctx.service, entity.streams.main) return json(result) } + +async function signalEntity( + request: AgentsRouteRequest, + ctx: TenantContext +): Promise { + const parsed = routeBody(request) + const { entityUrl, entity } = requireExistingEntityRoute(request) + let signal + try { + signal = assertEntitySignal(parsed.signal) + } catch { + return apiError( + 400, + ErrCodeInvalidSignal, + `Invalid signal: ${parsed.signal}` + ) + } + const result = await ctx.entityManager.signal(entityUrl, { + signal, + reason: parsed.reason, + payload: parsed.payload, + }) + if (result.new_state === `stopped` || result.new_state === `killed`) { + await unlinkEntityDispatchSubscription(ctx, entity) + ctx.runtime.claimWriteTokens.clearStream(ctx.service, entity.streams.main) + } + return json(result) +} diff --git a/packages/agents-server/src/routing/internal-router.ts b/packages/agents-server/src/routing/internal-router.ts index caa5067ae3..1c2ecf5eaf 100644 --- a/packages/agents-server/src/routing/internal-router.ts +++ b/packages/agents-server/src/routing/internal-router.ts @@ -382,7 +382,7 @@ async function webhookForward( enrichPromise, ]) - if (entity?.status === `stopped`) { + if (entity?.status === `stopped` || entity?.status === `paused`) { if (upsertPromise) await upsertPromise return json({ done: true }) } @@ -644,14 +644,17 @@ async function callbackForward( : undefined, }) } - await ctx.entityManager.registry.updateStatus(entity.url, `idle`) + await ctx.entityManager.registry.updateStatus( + entity.url, + entity.status === `stopping` ? `stopped` : `idle` + ) ctx.runtime.claimWriteTokens.clearStream( ctx.service, target.primaryStream ) await ctx.entityBridgeManager.onEntityChanged(entity.url) serverLog.info( - `[callback-forward] status updated to idle for ${entity.url}` + `[callback-forward] status updated after done for ${entity.url}` ) } else if (stillOwnsClaim) { ctx.runtime.claimWriteTokens.clearStream( diff --git a/packages/agents-server/src/routing/runners-router.ts b/packages/agents-server/src/routing/runners-router.ts index 4a5b74a97d..648c3d0ba7 100644 --- a/packages/agents-server/src/routing/runners-router.ts +++ b/packages/agents-server/src/routing/runners-router.ts @@ -575,7 +575,7 @@ async function notificationFromClaim( 404 ) } - if (entity.status === `stopped`) { + if (entity.status === `stopped` || entity.status === `paused`) { await ctx.streamClient.releaseSubscription( input.subscriptionId, input.claim.token, diff --git a/packages/agents-server/src/runtime.ts b/packages/agents-server/src/runtime.ts index 29038e5b6c..b5d8bbef78 100644 --- a/packages/agents-server/src/runtime.ts +++ b/packages/agents-server/src/runtime.ts @@ -533,7 +533,11 @@ export class ElectricAgentsTenantRuntime { return } - await this.manager.registry.updateStatus(entityUrl, `idle`) + const entity = await this.manager.registry.getEntity(entityUrl) + await this.manager.registry.updateStatus( + entityUrl, + entity?.status === `stopping` ? `stopped` : `idle` + ) await this.entityBridgeManager.onEntityChanged(entityUrl) } } diff --git a/packages/agents-server/test/dispatch-policy-routing.test.ts b/packages/agents-server/test/dispatch-policy-routing.test.ts index 3352e85fbc..645c592f60 100644 --- a/packages/agents-server/test/dispatch-policy-routing.test.ts +++ b/packages/agents-server/test/dispatch-policy-routing.test.ts @@ -226,6 +226,38 @@ describe(`dispatch policy routing`, () => { ).toBeLessThan((ctx.entityManager.send as any).mock.invocationCallOrder[0]) }) + it(`links webhook dispatch before sending spawn initialMessage`, async () => { + const dispatchPolicy: DispatchPolicy = { + targets: [{ type: `webhook`, url: `http://runtime.local/wake` }], + } + const ctx = buildContext() + ctx.entityManager.send = vi.fn(async () => undefined) + + const response = await globalRouter.fetch( + request(`PUT`, `/_electric/entities/chat/one`, { + dispatch_policy: dispatchPolicy, + initialMessage: `hello`, + }), + ctx + ) + + expect(response.status).toBe(201) + expect(ctx.streamClient.putSubscription).toHaveBeenCalledWith( + expect.stringMatching(/^webhook:/), + expect.objectContaining({ + type: `webhook`, + streams: [`/chat/one/main`], + }) + ) + expect(ctx.entityManager.send).toHaveBeenCalledWith(`/chat/one`, { + from: `/principal/user:owner@example.com`, + payload: `hello`, + }) + expect( + (ctx.streamClient.putSubscription as any).mock.invocationCallOrder[0] + ).toBeLessThan((ctx.entityManager.send as any).mock.invocationCallOrder[0]) + }) + it(`links legacy entities through the type default before sending`, async () => { const dispatchPolicy: DispatchPolicy = { targets: [{ type: `runner`, runnerId: `runner-1` }], diff --git a/packages/agents-server/test/electric-agents-routes.test.ts b/packages/agents-server/test/electric-agents-routes.test.ts index 8153b578f4..03077040c4 100644 --- a/packages/agents-server/test/electric-agents-routes.test.ts +++ b/packages/agents-server/test/electric-agents-routes.test.ts @@ -711,6 +711,77 @@ describe(`ElectricAgentsRoutes spawn endpoint request validation`, () => { }) }) +describe(`ElectricAgentsRoutes signal endpoint`, () => { + it(`routes valid signals to the manager and returns the signal response`, async () => { + const signalResponse = { + url: `/chat/test`, + signal: `SIGINT`, + previous_state: `running`, + new_state: `running`, + created_at: 1_760_000_000_000, + txid: 123, + } + const manager = { + registry: { + getEntity: vi.fn().mockResolvedValue({ + url: `/chat/test`, + streams: { main: `/chat/test/main`, error: `/chat/test/error` }, + }), + getEntityType: vi.fn(), + }, + signal: vi.fn().mockResolvedValue(signalResponse), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/test/signal`, + { + signal: `SIGINT`, + reason: `Stop from UI`, + payload: { source: `test` }, + } + ) + + expect(manager.signal).toHaveBeenCalledWith(`/chat/test`, { + signal: `SIGINT`, + reason: `Stop from UI`, + payload: { source: `test` }, + }) + expect(response.status).toBe(200) + await expect(response.json()).resolves.toEqual(signalResponse) + }) + + it(`rejects unknown signals before calling the manager`, async () => { + const manager = { + registry: { + getEntity: vi.fn().mockResolvedValue({ + url: `/chat/test`, + streams: { main: `/chat/test/main`, error: `/chat/test/error` }, + }), + getEntityType: vi.fn(), + }, + signal: vi.fn(), + } as any + + const response = await routeResponse( + manager, + `POST`, + `/_electric/entities/chat/test/signal`, + { signal: `NOPE` } + ) + + expect(manager.signal).not.toHaveBeenCalled() + expect(response.status).toBe(400) + await expect(response.json()).resolves.toEqual({ + error: { + code: `INVALID_SIGNAL`, + message: `Invalid signal: NOPE`, + }, + }) + }) +}) + describe(`ElectricAgentsRoutes fork endpoint`, () => { it(`routes fork requests to the manager and returns public entities`, async () => { const forkedRoot = { diff --git a/packages/agents-server/test/electric-agents-status.test.ts b/packages/agents-server/test/electric-agents-status.test.ts index 1af7912a3c..2b4a17e4f5 100644 --- a/packages/agents-server/test/electric-agents-status.test.ts +++ b/packages/agents-server/test/electric-agents-status.test.ts @@ -1,22 +1,151 @@ import { describe, expect, it, vi } from 'vitest' import { EntityManager } from '../src/entity-manager' -import { assertEntityStatus } from '../src/electric-agents-types' +import { + assertEntityStatus, + rejectsNormalWrites, +} from '../src/electric-agents-types' describe(`assertEntityStatus`, () => { it(`returns valid statuses unchanged`, () => { expect(assertEntityStatus(`spawning`)).toBe(`spawning`) expect(assertEntityStatus(`running`)).toBe(`running`) expect(assertEntityStatus(`idle`)).toBe(`idle`) + expect(assertEntityStatus(`paused`)).toBe(`paused`) + expect(assertEntityStatus(`stopping`)).toBe(`stopping`) expect(assertEntityStatus(`stopped`)).toBe(`stopped`) + expect(assertEntityStatus(`killed`)).toBe(`killed`) }) it(`throws on invalid status strings`, () => { expect(() => assertEntityStatus(`active`)).toThrow(`Invalid entity status`) - expect(() => assertEntityStatus(`paused`)).toThrow(`Invalid entity status`) expect(() => assertEntityStatus(``)).toThrow(`Invalid entity status`) }) }) +describe(`signal-aware status write guards`, () => { + it(`allows paused writes but rejects stopping, stopped, or killed`, () => { + expect(rejectsNormalWrites(`spawning`)).toBe(false) + expect(rejectsNormalWrites(`running`)).toBe(false) + expect(rejectsNormalWrites(`idle`)).toBe(false) + expect(rejectsNormalWrites(`paused`)).toBe(false) + expect(rejectsNormalWrites(`stopping`)).toBe(true) + expect(rejectsNormalWrites(`stopped`)).toBe(true) + expect(rejectsNormalWrites(`killed`)).toBe(true) + }) +}) + +describe(`ElectricAgentsManager.signal semantics`, () => { + function createSignalManager(status: `running` | `idle` | `paused`) { + const entity = { + url: `/chat/demo`, + type: `chat`, + status, + streams: { + main: `/chat/demo/main`, + error: `/chat/demo/error`, + }, + subscription_id: `chat-handler`, + write_token: `token`, + tags: {}, + spawn_args: {}, + created_at: Date.now(), + updated_at: Date.now(), + } + const registry = { + tenantId: `default`, + getEntity: vi.fn().mockResolvedValue(entity), + touchEntityWithTxid: vi.fn().mockResolvedValue(101), + updateStatusWithTxid: vi.fn().mockResolvedValue(202), + updateStatus: vi.fn().mockResolvedValue(undefined), + } + const append = vi.fn().mockResolvedValue({ offset: `1` }) + const unregisterBySubscriber = vi.fn().mockResolvedValue(undefined) + const unregisterBySource = vi.fn().mockResolvedValue(undefined) + const manager = new EntityManager({ + registry: registry as any, + streamClient: { + append, + } as any, + validator: {} as any, + wakeRegistry: { + evaluate: vi.fn(() => []), + unregisterBySubscriber, + unregisterBySource, + setTimeoutCallback: vi.fn(), + setDebounceCallback: vi.fn(), + } as any, + }) + return { + manager, + registry, + append, + unregisterBySubscriber, + unregisterBySource, + } + } + + it(`keeps SIGINT as a run-local abort without changing entity status`, async () => { + const { manager, registry } = createSignalManager(`running`) + + await expect( + manager.signal(`/chat/demo`, { signal: `SIGINT` }) + ).resolves.toMatchObject({ + previous_state: `running`, + new_state: `running`, + txid: 101, + }) + + expect(registry.touchEntityWithTxid).toHaveBeenCalledWith(`/chat/demo`) + expect(registry.updateStatusWithTxid).not.toHaveBeenCalled() + }) + + it(`moves SIGSTOP to paused so existing pending work is skipped`, async () => { + const { manager, registry } = createSignalManager(`running`) + + await expect( + manager.signal(`/chat/demo`, { signal: `SIGSTOP` }) + ).resolves.toMatchObject({ + previous_state: `running`, + new_state: `paused`, + txid: 202, + }) + + expect(registry.updateStatusWithTxid).toHaveBeenCalledWith( + `/chat/demo`, + `paused` + ) + }) + + it(`moves running SIGTERM to stopping until runtime cleanup marks stopped`, async () => { + const { manager, registry } = createSignalManager(`running`) + + await expect( + manager.signal(`/chat/demo`, { signal: `SIGTERM` }) + ).resolves.toMatchObject({ + previous_state: `running`, + new_state: `stopping`, + txid: 202, + }) + + expect(registry.updateStatusWithTxid).toHaveBeenCalledWith( + `/chat/demo`, + `stopping` + ) + }) + + it(`wakes a paused entity by moving it to idle on a new message`, async () => { + const { manager, registry } = createSignalManager(`paused`) + + await manager.send(`/chat/demo`, { + from: `user`, + payload: { text: `wake up` }, + }) + + expect(registry.updateStatusWithTxid).not.toHaveBeenCalled() + expect(registry.updateStatus).toHaveBeenCalledWith(`/chat/demo`, `idle`) + }) +}) + // ============================================================================= // Finding 4: spawn() leaks orphan wake registrations on materialize failure. // ============================================================================= diff --git a/packages/agents-server/test/entity-lifecycle.test.ts b/packages/agents-server/test/entity-lifecycle.test.ts index c637ad64af..c91ecf5d9e 100644 --- a/packages/agents-server/test/entity-lifecycle.test.ts +++ b/packages/agents-server/test/entity-lifecycle.test.ts @@ -34,7 +34,7 @@ describe(`entity lifecycle`, () => { await Promise.allSettled([electricAgentsServer?.stop(), dsServer?.stop()]) }, 120_000) - it(`killed entities remain readable with stopped status`, async () => { + it(`killed entities remain readable with killed status`, async () => { const createTypeResponse = await fetch( `${baseUrl}/_electric/entity-types`, { @@ -59,9 +59,11 @@ describe(`entity lifecycle`, () => { expect(spawnResponse.status).toBe(201) const killResponse = await fetch( - `${baseUrl}/_electric/entities/task/demo-1`, + `${baseUrl}/_electric/entities/task/demo-1/signal`, { - method: `DELETE`, + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify({ signal: `SIGKILL`, reason: `test cleanup` }), } ) expect(killResponse.status).toBe(200) @@ -71,7 +73,7 @@ describe(`entity lifecycle`, () => { await expect(getResponse.json()).resolves.toMatchObject({ url: `/task/demo-1`, type: `task`, - status: `stopped`, + status: `killed`, }) const headResponse = await fetch( @@ -82,4 +84,33 @@ describe(`entity lifecycle`, () => { ) expect(headResponse.status).toBe(200) }) + + it(`keeps lifecycle DELETE as a legacy kill alias`, async () => { + await fetch(`${baseUrl}/_electric/entity-types`, { + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify({ + name: `task`, + description: `Task entity`, + }), + }) + + const spawnResponse = await fetch( + `${baseUrl}/_electric/entities/task/demo-delete-rejected`, + { + method: `PUT`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify({}), + } + ) + expect(spawnResponse.status).toBe(201) + + const deleteResponse = await fetch( + `${baseUrl}/_electric/entities/task/demo-delete-rejected`, + { + method: `DELETE`, + } + ) + expect(deleteResponse.status).toBe(200) + }) }) diff --git a/packages/agents-server/test/runners-router.test.ts b/packages/agents-server/test/runners-router.test.ts index 0ddba15ade..922d2f219f 100644 --- a/packages/agents-server/test/runners-router.test.ts +++ b/packages/agents-server/test/runners-router.test.ts @@ -493,6 +493,51 @@ describe(`runner routes`, () => { ) }) + it(`releases paused entity claims without dispatching pending work`, async () => { + const ctx = buildContext() + vi.mocked(ctx.streamClient.claimSubscription).mockResolvedValue({ + wake_id: `wake-paused`, + generation: 7, + token: `claim-token`, + streams: [{ path: `chat/paused/main`, tail_offset: `12` }], + }) + vi.mocked(ctx.entityManager.registry.getEntityByStream).mockResolvedValue({ + url: `/chat/paused`, + type: `chat`, + status: `paused`, + streams: { main: `/chat/paused/main`, error: `/chat/paused/error` }, + subscription_id: `runner:runner-1`, + write_token: `entity-token`, + tags: {}, + created_at: 1, + updated_at: 1, + }) + + const response = await globalRouter.fetch( + request(`POST`, `/_electric/runners/runner-1/claim`, { + subscription_id: `runner:runner-1`, + stream: `chat/paused/main`, + generation: 7, + }), + ctx + ) + + expect(response.status).toBe(200) + await expect(response.json()).resolves.toEqual({ done: true }) + expect(ctx.streamClient.releaseSubscription).toHaveBeenCalledWith( + `runner:runner-1`, + `claim-token`, + { + wake_id: `wake-paused`, + generation: 7, + } + ) + expect( + ctx.entityManager.registry.materializeActiveClaim + ).not.toHaveBeenCalled() + expect(ctx.entityManager.registry.updateStatus).not.toHaveBeenCalled() + }) + it(`rejects invalid owner_principal with 400`, async () => { const response = await globalRouter.fetch( request(`POST`, `/_electric/runners`, { diff --git a/packages/agents-server/test/server-claim-write-token.test.ts b/packages/agents-server/test/server-claim-write-token.test.ts index 87d3b9f200..9c187920c9 100644 --- a/packages/agents-server/test/server-claim-write-token.test.ts +++ b/packages/agents-server/test/server-claim-write-token.test.ts @@ -431,9 +431,14 @@ describe(`Claim-scoped write tokens`, () => { claimWriteTokens.owns(`default`, entity.streams.main, `consumer-kill`) ).toBe(true) - const killRes = await fetch(`${baseUrl}/_electric/entities${entity.url}`, { - method: `DELETE`, - }) + const killRes = await fetch( + `${baseUrl}/_electric/entities${entity.url}/signal`, + { + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify({ signal: `SIGKILL`, reason: `test cleanup` }), + } + ) expect(killRes.status).toBe(200) expect( @@ -615,9 +620,14 @@ describe(`Claim-scoped write tokens`, () => { }) expect(claim.writeToken).toBeTruthy() - const killRes = await fetch(`${baseUrl}/_electric/entities${entity.url}`, { - method: `DELETE`, - }) + const killRes = await fetch( + `${baseUrl}/_electric/entities${entity.url}/signal`, + { + method: `POST`, + headers: { 'content-type': `application/json` }, + body: JSON.stringify({ signal: `SIGKILL`, reason: `test cleanup` }), + } + ) expect(killRes.status).toBe(200) const writeRes = await appendEntityEvent({ @@ -675,6 +685,30 @@ describe(`Claim-scoped write tokens`, () => { expect(await getEntityStatus(entity.url)).toBe(`idle`) }, 20_000) + it(`done transitions SIGTERM stopping entities to stopped`, async () => { + const typeName = `done-stopping-${Date.now()}` + const entity = await createEntity(typeName, `owner`) + const registry = (electricAgentsServer as any).electricAgentsManager! + .registry + + const claim = await claimEntityConsumer({ + streamPath: entity.streams.main, + consumerId: `consumer-done-stopping`, + }) + expect(claim.writeToken).toBeTruthy() + + await registry.updateStatus(entity.url, `stopping`) + expect(await getEntityStatus(entity.url)).toBe(`stopping`) + + const done = await sendDone({ + consumerId: `consumer-done-stopping`, + epoch: 4, + streamPath: entity.streams.main, + }) + expect(done.status).toBe(200) + expect(await getEntityStatus(entity.url)).toBe(`stopped`) + }, 20_000) + it(`claim-scoped tag writes reject non-string values and support merge/delete`, async () => { const typeName = `claim-tag-semantics-${Date.now()}` const entity = await createEntity(typeName, `owner`) diff --git a/packages/electric-ax/src/index.ts b/packages/electric-ax/src/index.ts index 1cc165be7c..1053ea564e 100644 --- a/packages/electric-ax/src/index.ts +++ b/packages/electric-ax/src/index.ts @@ -609,8 +609,12 @@ async function listEntities( } async function killEntity(env: ElectricCliEnv, url: string): Promise { - const res = await electricAgentsFetch(env, entityApiPath(url), { - method: `DELETE`, + const res = await electricAgentsFetch(env, entityApiPath(url, `/signal`), { + method: `POST`, + body: JSON.stringify({ + signal: `SIGKILL`, + reason: `Killed from CLI`, + }), }) if (!res.ok) {