Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/agents-signal-stop-controls.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@electric-ax/agents-runtime': patch
'@electric-ax/agents-server': patch
'@electric-ax/agents-server-ui': patch
'@electric-ax/agents-server-conformance-tests': patch
'electric-ax': patch
---

Add durable entity signals and signal-driven stop controls for agents. The server, runtime, conformance tests, and CLI now use signal APIs, persist signal events, and let the UI send `SIGINT` to cancel active generations with pending stop feedback.
15 changes: 15 additions & 0 deletions packages/agents-runtime/src/agents-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { createEntityStreamDB } from './entity-stream-db'
import { normalizeObservationSchema } from './observation-schema'
import { createRuntimeServerClient } from './runtime-server-client'
import { appendPathToUrl } from './url'
import type { EntitySignal } from './runtime-server-client'
import type {
EntitiesObservationSource,
EntityObservationSource,
Expand All @@ -23,12 +24,26 @@ export interface AgentsClient {
observe: (
source: ObservationSource
) => Promise<EntityStreamDB | ObservationStreamDB>
signal: (options: {
entityUrl: string
signal: EntitySignal
reason?: string
payload?: unknown
}) => Promise<{ txid: number }>
kill: (entityUrl: string, reason?: string) => Promise<{ txid: number }>
}

export function createAgentsClient(config: AgentsClientConfig): AgentsClient {
const serverClient = createRuntimeServerClient(config)

return {
signal: (options) => serverClient.signalEntity(options),
kill: (entityUrl, reason) =>
serverClient.signalEntity({
entityUrl,
signal: `SIGKILL`,
reason,
}),
async observe(source) {
if (source.sourceType === `entity`) {
const info = await serverClient.getEntityInfo(
Expand Down
14 changes: 13 additions & 1 deletion packages/agents-runtime/src/context-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import type {
AgentModel,
AgentRunResult,
AgentTool,
EntitySignal,
EntityHandle,
EntityStreamDBWithActions,
HandlerContext,
Expand Down Expand Up @@ -65,6 +66,14 @@ export interface HandlerContextConfig<TState extends StateProxy = StateProxy> {
writeEvent: (event: ChangeEvent) => void
wakeSession: WakeSession
wakeEvent: WakeEvent
runSignal?: AbortSignal
registerSignalHandler?: (
handler: (signal: {
signal: EntitySignal
reason?: string
payload?: unknown
}) => void | Promise<void>
) => void
doObserve: (
source: ObservationSource,
wake?: Wake
Expand Down Expand Up @@ -396,7 +405,7 @@ export function createHandlerContext<TState extends StateProxy = StateProxy>(
)
}

await handle.run(runInput)
await handle.run(runInput, config.runSignal)
runtimeLog.info(logPrefix, `agent.run completed`)

return {
Expand Down Expand Up @@ -575,6 +584,9 @@ export function createHandlerContext<TState extends StateProxy = StateProxy>(
afterMs: opts?.afterMs,
})
},
onSignal(handler): void {
config.registerSignalHandler?.(handler)
},
recordRun(): RunHandle {
const key = nextRunKey()
let deltaCounter = 0
Expand Down
99 changes: 97 additions & 2 deletions packages/agents-runtime/src/entity-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,31 @@ type SequencedPersistedRow<T extends { key?: string | undefined }> = Omit<
_seq?: number
}
type Schema<T> = z.ZodType<T>
type ChildEntityStatusValue = `spawning` | `running` | `idle` | `stopped`
type ChildEntityStatusValue =
| `spawning`
| `running`
| `idle`
| `paused`
| `stopping`
| `stopped`
| `killed`
export type EntitySignal =
| `SIGINT`
| `SIGHUP`
| `SIGTERM`
| `SIGKILL`
| `SIGSTOP`
| `SIGCONT`
| `SIGUSR`
type SignalHandlingStatus = `unhandled` | `handled`
type SignalOutcome =
| `transitioned`
| `ignored`
| `invalid_for_state`
| `delivered`
| `aborted`
| `shutdown_requested`
| `failed`
type TagEntryValue = {
key?: string
value: string
Expand Down Expand Up @@ -162,6 +186,20 @@ type EntityStoppedValue = {
timestamp: string
reason?: string
}
type SignalValue = {
key?: string
signal: EntitySignal
status: SignalHandlingStatus
sender?: string
reason?: string
payload?: unknown
timestamp: string
handled_at?: string
handled_by?: string
outcome?: SignalOutcome
previous_state?: ChildEntityStatusValue
new_state?: ChildEntityStatusValue
}
type ChildStatusEntryValue = {
key?: string
entity_url: string
Expand Down Expand Up @@ -270,7 +308,27 @@ function createJsonObjectSchema(): Schema<Record<string, JsonValue>> {
}

function createChildEntityStatusSchema(): Schema<ChildEntityStatusValue> {
return z.enum([`spawning`, `running`, `idle`, `stopped`])
return z.enum([
`spawning`,
`running`,
`idle`,
`paused`,
`stopping`,
`stopped`,
`killed`,
])
}

function createEntitySignalSchema(): Schema<EntitySignal> {
return z.enum([
`SIGINT`,
`SIGHUP`,
`SIGTERM`,
`SIGKILL`,
`SIGSTOP`,
`SIGCONT`,
`SIGUSR`,
])
}

function createWakeChangeSchema(): Schema<WakeChangeEntryValue> {
Expand Down Expand Up @@ -437,6 +495,33 @@ function createEntityStoppedSchema(): Schema<EntityStoppedValue> {
})
}

function createSignalSchema(): Schema<SignalValue> {
return z.object({
key: z.string().optional(),
signal: createEntitySignalSchema(),
status: z.enum([`unhandled`, `handled`]),
sender: z.string().optional(),
reason: z.string().optional(),
payload: z.unknown().optional(),
timestamp: z.string(),
handled_at: z.string().optional(),
handled_by: z.string().optional(),
outcome: z
.enum([
`transitioned`,
`ignored`,
`invalid_for_state`,
`delivered`,
`aborted`,
`shutdown_requested`,
`failed`,
])
.optional(),
previous_state: createChildEntityStatusSchema().optional(),
new_state: createChildEntityStatusSchema().optional(),
})
}

function createChildStatusSchema(): Schema<ChildStatusEntryValue> {
return z.object({
key: z.string().optional(),
Expand Down Expand Up @@ -591,6 +676,7 @@ export type MessageReceived = SequencedPersistedRow<MessageReceivedValue>
export type WakeEntry = SequencedPersistedRow<WakeEntryValue>
export type EntityCreated = SequencedPersistedRow<EntityCreatedValue>
export type EntityStopped = SequencedPersistedRow<EntityStoppedValue>
export type Signal = SequencedPersistedRow<SignalValue>
export type ChildStatusEntry = SequencedPersistedRow<ChildStatusEntryValue>
export type TagEntry = SequencedPersistedRow<TagEntryValue>
export type ContextInserted = SequencedPersistedRow<ContextInsertedValue>
Expand Down Expand Up @@ -660,6 +746,7 @@ export const ENTITY_COLLECTIONS = {
wakes: `wakes`,
entityCreated: `entityCreated`,
entityStopped: `entityStopped`,
signals: `signals`,
childStatus: `childStatus`,
tags: `tags`,
manifests: `manifests`,
Expand All @@ -685,6 +772,7 @@ export const BUILT_IN_EVENT_SCHEMAS = {
createEntityCreatedSchema() as unknown as BuiltInEntitySchema<EntityCreated>,
entity_stopped:
createEntityStoppedSchema() as unknown as BuiltInEntitySchema<EntityStopped>,
signal: createSignalSchema() as unknown as BuiltInEntitySchema<Signal>,
child_status:
createChildStatusSchema() as unknown as BuiltInEntitySchema<ChildStatusEntry>,
tags: createTagEntrySchema() as unknown as BuiltInEntitySchema<TagEntry>,
Expand Down Expand Up @@ -714,6 +802,7 @@ type EntityCollectionsDefinition = {
wakes: CollectionDefinition<WakeEntry>
entityCreated: CollectionDefinition<EntityCreated>
entityStopped: CollectionDefinition<EntityStopped>
signals: CollectionDefinition<Signal>
childStatus: CollectionDefinition<ChildStatusEntry>
tags: CollectionDefinition<TagEntry>
manifests: CollectionDefinition<Manifest>
Expand Down Expand Up @@ -786,6 +875,11 @@ export const builtInCollections: EntityCollectionsDefinition = {
type: `entity_stopped`,
primaryKey: `key`,
},
signals: {
schema: BUILT_IN_EVENT_SCHEMAS.signal as StandardSchemaV1<Signal>,
type: `signal`,
primaryKey: `key`,
},
childStatus: {
schema:
BUILT_IN_EVENT_SCHEMAS.child_status as StandardSchemaV1<ChildStatusEntry>,
Expand Down Expand Up @@ -836,6 +930,7 @@ export const entityStateSchema: StateSchema<EntityCollectionsDefinition> =
/** Event types that are management/bookkeeping rather than agent content. */
const MANAGEMENT_TYPES = new Set<string>([
`entity_created`,
`signal`,
`manifest`,
`replay_watermark`,
`ack`,
Expand Down
2 changes: 2 additions & 0 deletions packages/agents-runtime/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ export type {
WakeEntry,
EntityCreated,
EntityStopped,
EntitySignal,
Signal,
ChildStatusEntry,
TagEntry,
ContextInserted as ContextInsertedEvent,
Expand Down
Loading
Loading