Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions Sources/Swarm/HiveSwarm/GraphAgent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,40 @@ struct GraphAgent: AgentRuntime, Sendable {
return .invalidInput(reason: "Hive interrupt pending: \(interruptID.rawValue)")
case .noCheckpointToResume:
return .invalidInput(reason: "Hive resume requested with no checkpoint to resume.")
case let .checkpointNotFound(id):
return .invalidInput(reason: "Hive checkpoint not found: \(id.rawValue).")
case .noInterruptToResume:
return .invalidInput(reason: "Hive resume requested with no pending interrupt.")
case let .resumeInterruptMismatch(expected, found):
return .invalidInput(
reason: """
Hive resume interrupt mismatch.
expected=\(expected.rawValue), found=\(found.rawValue)
"""
)
case let .forkSourceCheckpointMissing(threadID, checkpointID):
return .internalError(
reason: """
Hive fork source checkpoint missing for thread '\(threadID.rawValue)'.
checkpointID=\(checkpointID?.rawValue ?? "nil")
"""
)
case .forkCheckpointStoreMissing:
return .internalError(reason: "Hive fork requires checkpoint store support but none is configured.")
case .forkCheckpointQueryUnsupported:
return .internalError(reason: "Hive checkpoint store does not support fork checkpoint query APIs.")
case let .forkTargetThreadConflict(threadID):
return .invalidInput(reason: "Hive fork target thread conflict: \(threadID.rawValue).")
case let .forkSchemaGraphMismatch(expectedSchema, expectedGraph, foundSchema, foundGraph):
return .internalError(
reason: """
Hive fork schema/graph mismatch.
expected(schema=\(expectedSchema), graph=\(expectedGraph))
found(schema=\(foundSchema), graph=\(foundGraph))
"""
)
case let .forkMalformedCheckpoint(field, errorDescription):
return .internalError(reason: "Hive fork checkpoint malformed at '\(field)': \(errorDescription)")

case let .unknownNodeID(nodeID):
return .internalError(reason: "Hive unknown node ID: \(nodeID.rawValue)")
Expand Down
30 changes: 29 additions & 1 deletion Sources/Swarm/HiveSwarm/RuntimeHardening.swift
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,35 @@ enum HiveDeterminism {
("runResumed", ["interruptID": interruptID.rawValue])
case .runCancelled:
("runCancelled", [:])
// Fork events removed — not available in current HiveCore version
case let .forkStarted(sourceThreadID, targetThreadID, sourceCheckpointID):
(
"forkStarted",
[
"sourceThreadID": sourceThreadID.rawValue,
"targetThreadID": targetThreadID.rawValue,
"sourceCheckpointID": sourceCheckpointID?.rawValue ?? "nil"
]
)
case let .forkCompleted(sourceThreadID, targetThreadID, sourceCheckpointID, targetCheckpointID):
(
"forkCompleted",
[
"sourceThreadID": sourceThreadID.rawValue,
"targetThreadID": targetThreadID.rawValue,
"sourceCheckpointID": sourceCheckpointID.rawValue,
"targetCheckpointID": targetCheckpointID?.rawValue ?? "nil"
]
)
case let .forkFailed(sourceThreadID, targetThreadID, sourceCheckpointID, errorCode):
(
"forkFailed",
[
"sourceThreadID": sourceThreadID.rawValue,
"targetThreadID": targetThreadID.rawValue,
"sourceCheckpointID": sourceCheckpointID?.rawValue ?? "nil",
"errorCode": errorCode
]
)
case let .stepStarted(stepIndex, frontierCount):
("stepStarted", ["stepIndex": String(stepIndex), "frontierCount": String(frontierCount)])
case let .stepFinished(stepIndex, nextFrontierCount):
Expand Down
15 changes: 15 additions & 0 deletions Sources/Swarm/Integration/Wax/WaxMemory.swift
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public actor WaxMemory: Memory, MemoryPromptDescriptor, MemorySessionLifecycle {
public func clear() async {
do {
try await store.close()
try clearPersistedStoreArtifacts()
var waxConfig = Wax.Memory.Config.default
waxConfig.enableVectorSearch = embedder != nil && configuration.enableVectorSearch

Expand Down Expand Up @@ -126,6 +127,20 @@ public actor WaxMemory: Memory, MemoryPromptDescriptor, MemorySessionLifecycle {
private var messages: [MemoryMessage] = []
private let isoFormatter = ISO8601DateFormatter()

private func clearPersistedStoreArtifacts() throws {
let fileManager = FileManager.default
let storeName = url.lastPathComponent
let parent = url.deletingLastPathComponent()

if let entries = try? fileManager.contentsOfDirectory(at: parent, includingPropertiesForKeys: nil) {
for entry in entries where entry.lastPathComponent.hasPrefix(storeName) {
try? fileManager.removeItem(at: entry)
}
} else if fileManager.fileExists(atPath: url.path) {
try? fileManager.removeItem(at: url)
}
}

private func formatRAGContext(_ rag: RAGContext, tokenLimit: Int) -> String {
guard tokenLimit > 0 else { return "" }

Expand Down
78 changes: 77 additions & 1 deletion Tests/HiveSwarmTests/ChatGraphTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ struct HiveAgentsTests {
_ = try await runControl.getCheckpointHistory(threadID: HiveThreadID("query-unsupported"), limit: 1)
}
let queryError = try #require(thrown as? HiveCheckpointQueryError)
#expect(queryError == .unsupported)
#expect(queryError == .unsupported(operation: .listCheckpoints))
}

@Test("getState returns nil for missing thread and deterministic snapshot for existing thread")
Expand Down Expand Up @@ -877,6 +877,82 @@ struct HiveAgentsTests {
#expect(firstDiff.path == "events[0].kind")
}

@Test("Determinism transcript canonicalizes fork lifecycle events")
func determinismUtilities_canonicalizesForkEvents() throws {
let runID = HiveRunID(UUID(uuidString: "00000000-0000-0000-0000-000000000111")!)
let attemptID = HiveRunAttemptID(UUID(uuidString: "00000000-0000-0000-0000-000000000222")!)
let metadata = [EventSchemaVersion.metadataKey: EventSchemaVersion.current]

let events: [HiveEvent] = [
HiveEvent(
id: HiveEventID(
runID: runID,
attemptID: attemptID,
eventIndex: 0,
stepIndex: nil,
taskOrdinal: nil
),
kind: .forkStarted(
sourceThreadID: HiveThreadID("thread-source"),
targetThreadID: HiveThreadID("thread-target"),
sourceCheckpointID: nil
),
metadata: metadata
),
HiveEvent(
id: HiveEventID(
runID: runID,
attemptID: attemptID,
eventIndex: 1,
stepIndex: nil,
taskOrdinal: nil
),
kind: .forkCompleted(
sourceThreadID: HiveThreadID("thread-source"),
targetThreadID: HiveThreadID("thread-target"),
sourceCheckpointID: HiveCheckpointID("ckpt-source"),
targetCheckpointID: HiveCheckpointID("ckpt-target")
),
metadata: metadata
),
HiveEvent(
id: HiveEventID(
runID: runID,
attemptID: attemptID,
eventIndex: 2,
stepIndex: nil,
taskOrdinal: nil
),
kind: .forkFailed(
sourceThreadID: HiveThreadID("thread-source"),
targetThreadID: HiveThreadID("thread-target"),
sourceCheckpointID: HiveCheckpointID("ckpt-source"),
errorCode: "schema_mismatch"
),
metadata: metadata
)
]

let transcript = try HiveDeterminism.projectTranscript(events)
#expect(transcript.events.count == 3)

let started = transcript.events[0]
#expect(started.kind == "forkStarted")
#expect(started.attributes["sourceThreadID"] == "thread-source")
#expect(started.attributes["targetThreadID"] == "thread-target")
#expect(started.attributes["sourceCheckpointID"] == "nil")

let completed = transcript.events[1]
#expect(completed.kind == "forkCompleted")
#expect(completed.attributes["sourceCheckpointID"] == "ckpt-source")
#expect(completed.attributes["targetCheckpointID"] == "ckpt-target")

let failed = transcript.events[2]
#expect(failed.kind == "forkFailed")
#expect(failed.attributes["sourceCheckpointID"] == "ckpt-source")
#expect(failed.attributes["errorCode"] == "schema_mismatch")
}

@Test("Cancel/checkpoint race is classified deterministically")
func cancelCheckpointRace_classifiesDeterministically() async throws {
let graph = try ChatGraph.makeToolUsingChatAgent()
Expand Down
Loading