diff --git a/brain-bar/Sources/BrainBar/BrainBarApp.swift b/brain-bar/Sources/BrainBar/BrainBarApp.swift index ef8fd1c..4b17bea 100644 --- a/brain-bar/Sources/BrainBar/BrainBarApp.swift +++ b/brain-bar/Sources/BrainBar/BrainBarApp.swift @@ -73,50 +73,51 @@ final class AppDelegate: NSObject, NSApplicationDelegate { createLegacyStatusItem() } - DispatchQueue.global(qos: .userInitiated).async { [weak self] in - guard let self else { return } - - let dbPath = BrainBarServer.defaultDBPath() - NSLog("[BrainBar] Opening database at %@", dbPath) - let sharedDatabase = BrainDatabase(path: dbPath) - - let server = BrainBarServer(database: sharedDatabase) - server.onDatabaseReady = { [weak self] database in - Task { @MainActor in - self?.configureQuickCapture(database: database) - } + let dbPath = BrainBarServer.defaultDBPath() + NSLog("[BrainBar] Starting server before database readiness at %@", dbPath) + let server = BrainBarServer(dbPath: dbPath) + server.onStartRejected = { reason in + NSLog("[BrainBar] Startup rejected: %@", reason) + Task { @MainActor in + NSApp.terminate(nil) } - - DispatchQueue.main.async { [weak self] in + } + server.onDatabaseReady = { [weak self] database in + Task { @MainActor in guard let self else { return } - let collector = StatsCollector( - dbPath: dbPath, - daemonMonitor: DaemonHealthMonitor(targetPID: ProcessInfo.processInfo.processIdentifier) - ) - let injectionStore = try? InjectionStore(databasePath: dbPath) - - self.sharedDatabase = sharedDatabase - self.server = server - self.collector = collector - self.injectionStore = injectionStore - - server.start() - runtime.install( - collector: collector, - injectionStore: injectionStore, - database: sharedDatabase + self.sharedDatabase = database + self.configureQuickCapture(database: database) + self.runtime.install( + collector: self.collector ?? StatsCollector( + dbPath: dbPath, + daemonMonitor: DaemonHealthMonitor(targetPID: ProcessInfo.processInfo.processIdentifier) + ), + injectionStore: self.injectionStore, + database: database ) - flushPendingBrainBarURLs() + self.flushPendingBrainBarURLs() + } + } - if launchMode == .legacyStatusItem { - installLegacyMenuBarSurface(with: collector) - } + let collector = StatsCollector( + dbPath: dbPath, + daemonMonitor: DaemonHealthMonitor(targetPID: ProcessInfo.processInfo.processIdentifier) + ) + let injectionStore = try? InjectionStore(databasePath: dbPath) - collector.start() - configureQuickCaptureHotkey() - NSLog("[BrainBar] Backend ready — launchMode=%@", String(describing: launchMode)) - } + self.server = server + self.collector = collector + self.injectionStore = injectionStore + + server.start() + + if launchMode == .legacyStatusItem { + installLegacyMenuBarSurface(with: collector) } + + collector.start() + configureQuickCaptureHotkey() + NSLog("[BrainBar] Socket ready; database will self-heal — launchMode=%@", String(describing: launchMode)) } func applicationWillTerminate(_ notification: Notification) { diff --git a/brain-bar/Sources/BrainBar/BrainBarInstanceLock.swift b/brain-bar/Sources/BrainBar/BrainBarInstanceLock.swift new file mode 100644 index 0000000..b72545e --- /dev/null +++ b/brain-bar/Sources/BrainBar/BrainBarInstanceLock.swift @@ -0,0 +1,66 @@ +import Darwin +import Foundation +import os + +final class BrainBarInstanceLock: @unchecked Sendable { + enum AcquireError: Error, Equatable { + case alreadyRunning + case openFailed(String) + case lockFailed(String) + } + + private let fd: Int32 + private let lockPath: String + private let releaseLock = OSAllocatedUnfairLock(initialState: false) + + private init(fd: Int32, lockPath: String) { + self.fd = fd + self.lockPath = lockPath + } + + deinit { + release() + } + + static func acquire(lockPath: String) throws -> BrainBarInstanceLock { + let directory = URL(fileURLWithPath: lockPath).deletingLastPathComponent() + try FileManager.default.createDirectory(at: directory, withIntermediateDirectories: true) + + let fd = open(lockPath, O_RDWR | O_CREAT, 0o600) + guard fd >= 0 else { + throw AcquireError.openFailed(String(cString: strerror(errno))) + } + + guard flock(fd, LOCK_EX | LOCK_NB) == 0 else { + let lockErrno = errno + let message = String(cString: strerror(lockErrno)) + close(fd) + if lockErrno == EWOULDBLOCK { + throw AcquireError.alreadyRunning + } + throw AcquireError.lockFailed(message) + } + + var pidLine = "\(getpid())\n".data(using: .utf8) ?? Data() + ftruncate(fd, 0) + lseek(fd, 0, SEEK_SET) + _ = pidLine.withUnsafeMutableBytes { ptr in + write(fd, ptr.baseAddress, ptr.count) + } + fsync(fd) + + return BrainBarInstanceLock(fd: fd, lockPath: lockPath) + } + + func release() { + let shouldRelease = releaseLock.withLock { released -> Bool in + guard !released else { return false } + released = true + return true + } + guard shouldRelease else { return } + flock(fd, LOCK_UN) + close(fd) + try? FileManager.default.removeItem(atPath: lockPath) + } +} diff --git a/brain-bar/Sources/BrainBar/BrainBarServer.swift b/brain-bar/Sources/BrainBar/BrainBarServer.swift index a547d19..02366bd 100644 --- a/brain-bar/Sources/BrainBar/BrainBarServer.swift +++ b/brain-bar/Sources/BrainBar/BrainBarServer.swift @@ -91,6 +91,7 @@ final class BrainBarServer: @unchecked Sendable { private let dbPath: String private let providedDatabase: BrainDatabase? private let databaseRecoveryPolicy: DatabaseRecoveryPolicy + private let instanceLockPath: String private let queue = DispatchQueue(label: "com.brainlayer.brainbar.server", qos: .userInitiated) private var listenFD: Int32 = -1 private var listenSource: DispatchSourceRead? @@ -99,7 +100,10 @@ final class BrainBarServer: @unchecked Sendable { private var database: BrainDatabase! private var databaseRetryWorkItem: DispatchWorkItem? private var lastDatabaseRetryDelayMillis: UInt64? + private var databaseOpenInProgress = false + private var instanceLock: BrainBarInstanceLock? var onDatabaseReady: (@Sendable (BrainDatabase) -> Void)? + var onStartRejected: (@Sendable (String) -> Void)? /// Maximum EAGAIN retries before disconnecting a stalled client. /// Each retry sleeps 1ms, so 10 retries = 10ms max blocking the serial queue. static let maxWriteRetries = 10 @@ -137,12 +141,14 @@ final class BrainBarServer: @unchecked Sendable { socketPath: String? = nil, dbPath: String? = nil, database: BrainDatabase? = nil, - databaseRecoveryPolicy: DatabaseRecoveryPolicy = DatabaseRecoveryPolicy() + databaseRecoveryPolicy: DatabaseRecoveryPolicy = DatabaseRecoveryPolicy(), + instanceLockPath: String? = nil ) { self.socketPath = socketPath ?? Self.defaultSocketPath() self.dbPath = dbPath ?? Self.defaultDBPath() providedDatabase = database self.databaseRecoveryPolicy = databaseRecoveryPolicy + self.instanceLockPath = instanceLockPath ?? Self.defaultInstanceLockPath(socketPath: self.socketPath) } static func defaultSocketPath() -> String { @@ -162,6 +168,10 @@ final class BrainBarServer: @unchecked Sendable { return "\(home)/.local/share/brainlayer/brainlayer.db" } + static func defaultInstanceLockPath(socketPath: String = defaultSocketPath()) -> String { + "\(socketPath).lock" + } + func start() { queue.async { [weak self] in self?.startOnQueue() @@ -175,6 +185,18 @@ final class BrainBarServer: @unchecked Sendable { } private func startOnQueue() { + do { + instanceLock = try BrainBarInstanceLock.acquire(lockPath: instanceLockPath) + } catch BrainBarInstanceLock.AcquireError.alreadyRunning { + NSLog("[BrainBar] Another BrainBar instance owns %@. Exiting this server.", instanceLockPath) + onStartRejected?("another BrainBar instance owns \(instanceLockPath)") + return + } catch { + NSLog("[BrainBar] Failed to acquire single-instance lock %@: %@", instanceLockPath, String(describing: error)) + onStartRejected?("failed to acquire single-instance lock \(instanceLockPath): \(error)") + return + } + // 1. Create router FIRST (no DB dependency). // initialize + tools/list work without a database. router = MCPRouter() @@ -253,12 +275,36 @@ final class BrainBarServer: @unchecked Sendable { } private func attemptDatabaseOpen() { - guard database == nil else { return } + guard database == nil, !databaseOpenInProgress else { return } + + if let providedDatabase { + finishDatabaseOpen(providedDatabase) + return + } + + databaseOpenInProgress = true + let dbPath = self.dbPath + let busyTimeoutMillis = databaseRecoveryPolicy.busyTimeoutMillis + DispatchQueue.global(qos: .userInitiated).async { [weak self] in + let db = BrainDatabase( + path: dbPath, + openConfiguration: .init(busyTimeoutMillis: busyTimeoutMillis) + ) + self?.queue.async { [weak self] in + self?.databaseOpenInProgress = false + self?.finishDatabaseOpen(db) + } + } + } + + private func finishDatabaseOpen(_ db: BrainDatabase) { + guard listenSource != nil else { + if providedDatabase == nil { + db.close() + } + return + } - let db = providedDatabase ?? BrainDatabase( - path: dbPath, - openConfiguration: .init(busyTimeoutMillis: databaseRecoveryPolicy.busyTimeoutMillis) - ) if db.isOpen { databaseRetryWorkItem?.cancel() databaseRetryWorkItem = nil @@ -496,6 +542,9 @@ final class BrainBarServer: @unchecked Sendable { database?.close() } database = nil + databaseOpenInProgress = false + instanceLock?.release() + instanceLock = nil NSLog("[BrainBar] Server stopped") } diff --git a/brain-bar/Sources/BrainBar/BrainDatabase.swift b/brain-bar/Sources/BrainBar/BrainDatabase.swift index 4268cc1..df191af 100644 --- a/brain-bar/Sources/BrainBar/BrainDatabase.swift +++ b/brain-bar/Sources/BrainBar/BrainDatabase.swift @@ -168,6 +168,11 @@ final class BrainDatabase: @unchecked Sendable { let importance: Int } + enum StoreWriteOutcome: Sendable { + case stored(StoredChunk) + case queued + } + struct PendingStoreItem: Codable, Sendable { let content: String let tags: [String] @@ -702,7 +707,9 @@ final class BrainDatabase: @unchecked Sendable { importance: Int, source: String, queueID: String? = nil, - refreshStatistics: Bool = true + refreshStatistics: Bool = true, + retries: Int = 3, + busyTimeoutMillis: Int32? = nil ) throws -> StoredChunk { guard let db else { throw DBError.notOpen } let chunkID = "brainbar-\(UUID().uuidString.lowercased().prefix(12))" @@ -712,7 +719,17 @@ final class BrainDatabase: @unchecked Sendable { INSERT INTO chunks (id, content, metadata, source_file, tags, importance, source, content_type, char_count, preview_text) VALUES (?, ?, ?, 'brainbar-store', ?, ?, ?, 'user_message', ?, ?) """ - try runWriteStatement(on: db, sql: sql, retries: 3) { stmt in + let previousBusyTimeout = busyTimeoutMillis.flatMap { _ in queryPragma(db, name: "busy_timeout") } + if let busyTimeoutMillis { + try executeOnHandle(db, sql: "PRAGMA busy_timeout = \(max(1, busyTimeoutMillis))") + } + defer { + if let previousBusyTimeout { + try? executeOnHandle(db, sql: "PRAGMA busy_timeout = \(previousBusyTimeout)") + } + } + + try runWriteStatement(on: db, sql: sql, retries: retries) { stmt in bindText(chunkID, to: stmt, index: 1) bindText(content, to: stmt, index: 2) bindText(metadataJSON, to: stmt, index: 3) @@ -731,6 +748,33 @@ final class BrainDatabase: @unchecked Sendable { return StoredChunk(chunkID: chunkID, rowID: rowID) } + func storeOrQueueWithinBudget( + content: String, + tags: [String], + importance: Int, + source: String, + busyTimeoutMillis: Int32 = 50 + ) throws -> StoreWriteOutcome { + do { + let stored = try store( + content: content, + tags: tags, + importance: importance, + source: source, + refreshStatistics: true, + retries: 0, + busyTimeoutMillis: busyTimeoutMillis + ) + return .stored(stored) + } catch { + guard shouldQueueStoreError(error) else { + throw error + } + try queuePendingStore(content: content, tags: tags, importance: importance, source: source) + return .queued + } + } + /// Async wrapper for store() — runs DB write off the main thread. func storeAsync(content: String, tags: [String], importance: Int, source: String) async throws -> StoredChunk { try await withCheckedThrowingContinuation { continuation in diff --git a/brain-bar/Sources/BrainBar/MCPRouter.swift b/brain-bar/Sources/BrainBar/MCPRouter.swift index 665cade..208b3a9 100644 --- a/brain-bar/Sources/BrainBar/MCPRouter.swift +++ b/brain-bar/Sources/BrainBar/MCPRouter.swift @@ -285,8 +285,8 @@ final class MCPRouter: @unchecked Sendable { guard let db = database else { throw ToolError.noDatabase } - do { - let stored = try db.store(content: content, tags: tags, importance: importance, source: "mcp") + switch try db.storeOrQueueWithinBudget(content: content, tags: tags, importance: importance, source: "mcp") { + case .stored(let stored): let flushedStores = db.flushPendingStores() return ToolOutput( text: Formatters.formatStoreResult(chunkId: stored.chunkID), @@ -308,11 +308,7 @@ final class MCPRouter: @unchecked Sendable { } ] ) - } catch { - guard db.shouldQueueStoreError(error) else { - throw error - } - try db.queuePendingStore(content: content, tags: tags, importance: importance, source: "mcp") + case .queued: return ToolOutput( text: Formatters.formatStoreResult(chunkId: "", queued: true), metadata: ["queued": true] diff --git a/brain-bar/Tests/BrainBarTests/BrainBarReliabilityTests.swift b/brain-bar/Tests/BrainBarTests/BrainBarReliabilityTests.swift new file mode 100644 index 0000000..a0f276a --- /dev/null +++ b/brain-bar/Tests/BrainBarTests/BrainBarReliabilityTests.swift @@ -0,0 +1,314 @@ +import XCTest +import SQLite3 +@testable import BrainBar + +final class BrainBarReliabilityTests: XCTestCase { + private var servers: [BrainBarServer] = [] + private var tempDirectories: [URL] = [] + + override func tearDown() { + for server in servers { + server.stop() + } + servers.removeAll() + for directory in tempDirectories { + try? FileManager.default.removeItem(at: directory) + } + tempDirectories.removeAll() + super.tearDown() + } + + func testSingleInstanceLockRejectsSecondOwner() throws { + let directory = makeReliabilityTempDirectory() + let lockPath = directory.appendingPathComponent("brainbar.sock.lock").path + + let first = try BrainBarInstanceLock.acquire(lockPath: lockPath) + defer { first.release() } + + XCTAssertThrowsError(try BrainBarInstanceLock.acquire(lockPath: lockPath)) { error in + guard case BrainBarInstanceLock.AcquireError.alreadyRunning = error else { + XCTFail("Expected alreadyRunning, got \(error)") + return + } + } + } + + func testInitializeReturnsImmediatelyWhileStartupDatabaseIsWriteLocked() throws { + let directory = makeReliabilityTempDirectory() + let dbPath = directory.appendingPathComponent("brainbar.db").path + let socketPath = directory.appendingPathComponent("brainbar.sock").path + + let seededDB = BrainDatabase(path: dbPath) + XCTAssertTrue(seededDB.isOpen) + seededDB.close() + + let lockDB = try openReliabilitySQLiteConnection(path: dbPath) + defer { sqlite3_close(lockDB) } + XCTAssertEqual(sqlite3_exec(lockDB, "BEGIN IMMEDIATE", nil, nil, nil), SQLITE_OK) + var lockCommitted = false + defer { + if !lockCommitted { + sqlite3_exec(lockDB, "COMMIT", nil, nil, nil) + } + } + + let databaseReady = expectation(description: "database ready after initialize lock clears") + let server = BrainBarServer( + socketPath: socketPath, + dbPath: dbPath, + databaseRecoveryPolicy: .init( + busyTimeoutMillis: 1_000, + initialRetryDelayMillis: 25, + maximumRetryDelayMillis: 50 + ) + ) + server.onDatabaseReady = { _ in + databaseReady.fulfill() + } + servers.append(server) + server.start() + + let started = Date() + let response = try sendReliabilityMCPRequest( + to: socketPath, + request: [ + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": [ + "protocolVersion": "2024-11-05", + "capabilities": [:] as [String: Any], + "clientInfo": ["name": "reliability-test", "version": "1.0"] + ] + ], + timeout: 0.50 + ) + + XCTAssertLessThan(Date().timeIntervalSince(started), 0.35) + XCTAssertEqual((response["result"] as? [String: Any])?["protocolVersion"] as? String, "2024-11-05") + XCTAssertEqual(sqlite3_exec(lockDB, "COMMIT", nil, nil, nil), SQLITE_OK) + lockCommitted = true + wait(for: [databaseReady], timeout: 1.0) + } + + func testBrainStoreQueuesWithinBudgetWhenDatabaseWriteLockIsHeld() throws { + let directory = makeReliabilityTempDirectory() + let dbPath = directory.appendingPathComponent("brainbar.db").path + let queuePath = directory.appendingPathComponent("pending-stores.jsonl").path + setenv("BRAINBAR_PENDING_STORES_PATH", queuePath, 1) + defer { unsetenv("BRAINBAR_PENDING_STORES_PATH") } + + let db = BrainDatabase(path: dbPath) + XCTAssertTrue(db.isOpen) + defer { db.close() } + + let router = MCPRouter() + router.setDatabase(db) + + let lockDB = try openReliabilitySQLiteConnection(path: dbPath) + defer { sqlite3_close(lockDB) } + XCTAssertEqual(sqlite3_exec(lockDB, "BEGIN IMMEDIATE", nil, nil, nil), SQLITE_OK) + defer { sqlite3_exec(lockDB, "COMMIT", nil, nil, nil) } + + let started = Date() + let response = router.handle([ + "jsonrpc": "2.0", + "id": 2, + "method": "tools/call", + "params": [ + "name": "brain_store", + "arguments": [ + "content": "Queued while another writer holds the database lock", + "tags": ["reliability-test"], + "importance": 8 + ] as [String: Any] + ] as [String: Any] + ]) + + XCTAssertLessThan(Date().timeIntervalSince(started), 0.20) + let result = try XCTUnwrap(response["result"] as? [String: Any]) + XCTAssertEqual(result["queued"] as? Bool, true) + XCTAssertTrue(FileManager.default.fileExists(atPath: queuePath)) + } + + func testQueuedWriteBurstDoesNotTakeReadPathDown() throws { + let directory = makeReliabilityTempDirectory() + let dbPath = directory.appendingPathComponent("brainbar.db").path + let queuePath = directory.appendingPathComponent("pending-stores.jsonl").path + setenv("BRAINBAR_PENDING_STORES_PATH", queuePath, 1) + defer { unsetenv("BRAINBAR_PENDING_STORES_PATH") } + + let db = BrainDatabase(path: dbPath) + XCTAssertTrue(db.isOpen) + defer { db.close() } + try db.insertChunk( + id: "read-survives-write-burst", + content: "BrainBar read path should survive queued write bursts", + sessionId: "s1", + project: "reliability", + contentType: "assistant_text", + importance: 8 + ) + + let router = MCPRouter() + router.setDatabase(db) + + let lockDB = try openReliabilitySQLiteConnection(path: dbPath) + defer { sqlite3_close(lockDB) } + XCTAssertEqual(sqlite3_exec(lockDB, "BEGIN IMMEDIATE", nil, nil, nil), SQLITE_OK) + defer { sqlite3_exec(lockDB, "COMMIT", nil, nil, nil) } + + for index in 0..<5 { + let response = router.handle([ + "jsonrpc": "2.0", + "id": index, + "method": "tools/call", + "params": [ + "name": "brain_store", + "arguments": [ + "content": "Queued burst item \(index)", + "tags": ["r02-regression", "burst"], + "importance": 7 + ] as [String: Any] + ] as [String: Any] + ]) + let result = try XCTUnwrap(response["result"] as? [String: Any]) + XCTAssertEqual(result["queued"] as? Bool, true) + } + + let started = Date() + let searchResponse = router.handle([ + "jsonrpc": "2.0", + "id": 99, + "method": "tools/call", + "params": [ + "name": "brain_search", + "arguments": ["query": "read path survive", "num_results": 5] + ] as [String: Any] + ]) + + XCTAssertLessThan(Date().timeIntervalSince(started), 0.20) + let result = try XCTUnwrap(searchResponse["result"] as? [String: Any]) + XCTAssertNil(result["isError"]) + let content = try XCTUnwrap(result["content"] as? [[String: Any]]) + let text = content.compactMap { $0["text"] as? String }.joined(separator: "\n") + XCTAssertTrue(text.contains("read-survives-write-burst") || text.contains("read path should survive")) + } + + private func makeReliabilityTempDirectory() -> URL { + let shortID = UUID().uuidString.prefix(8) + let directory = URL(fileURLWithPath: "/private/tmp/br-\(shortID)", isDirectory: true) + try? FileManager.default.createDirectory(at: directory, withIntermediateDirectories: true) + tempDirectories.append(directory) + return directory + } +} + +private func openReliabilitySQLiteConnection(path: String) throws -> OpaquePointer { + var db: OpaquePointer? + let rc = sqlite3_open_v2(path, &db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_FULLMUTEX, nil) + guard rc == SQLITE_OK, let db else { + let message = db.flatMap { String(cString: sqlite3_errmsg($0)) } ?? "unknown" + if let db { + sqlite3_close(db) + } + throw NSError(domain: "BrainBarReliabilityTests", code: Int(rc), userInfo: [ + NSLocalizedDescriptionKey: "Failed to open sqlite connection: \(message)" + ]) + } + return db +} + +private func sendReliabilityMCPRequest( + to socketPath: String, + request: [String: Any], + timeout: TimeInterval +) throws -> [String: Any] { + let fd = try connectReliabilitySocket(path: socketPath, timeout: timeout) + defer { close(fd) } + + let payload = try JSONSerialization.data(withJSONObject: request) + var framed = Data("Content-Length: \(payload.count)\r\n\r\n".utf8) + framed.append(payload) + _ = framed.withUnsafeBytes { ptr in + write(fd, ptr.baseAddress, ptr.count) + } + + return try readReliabilityFramedMessage(from: fd, timeout: timeout) +} + +private func connectReliabilitySocket(path: String, timeout: TimeInterval) throws -> Int32 { + let fd = socket(AF_UNIX, SOCK_STREAM, 0) + guard fd >= 0 else { + throw NSError(domain: NSPOSIXErrorDomain, code: Int(errno)) + } + + var addr = sockaddr_un() + addr.sun_family = sa_family_t(AF_UNIX) + let pathBytes = path.utf8CString + let pathCapacity = MemoryLayout.size(ofValue: addr.sun_path) + guard pathBytes.count <= pathCapacity else { + close(fd) + throw NSError(domain: NSPOSIXErrorDomain, code: Int(ENAMETOOLONG), userInfo: [ + NSLocalizedDescriptionKey: "Socket path too long (\(pathBytes.count) > \(pathCapacity)): \(path)" + ]) + } + withUnsafeMutablePointer(to: &addr.sun_path) { ptr in + ptr.withMemoryRebound(to: CChar.self, capacity: pathCapacity) { dest in + pathBytes.withUnsafeBufferPointer { src in + _ = memcpy(dest, src.baseAddress!, src.count) + } + } + } + + let deadline = Date().addingTimeInterval(timeout) + var lastErrno = ENOENT + while Date() < deadline { + let result = withUnsafePointer(to: &addr) { addrPtr in + addrPtr.withMemoryRebound(to: sockaddr.self, capacity: 1) { ptr in + connect(fd, ptr, socklen_t(MemoryLayout.size)) + } + } + if result == 0 { + return fd + } + lastErrno = errno + if errno != ENOENT && errno != ECONNREFUSED { + break + } + Thread.sleep(forTimeInterval: 0.01) + } + + close(fd) + throw NSError(domain: NSPOSIXErrorDomain, code: Int(lastErrno)) +} + +private func readReliabilityFramedMessage(from fd: Int32, timeout: TimeInterval) throws -> [String: Any] { + let deadline = Date().addingTimeInterval(timeout) + var buffer = Data() + var readBuf = [UInt8](repeating: 0, count: 4096) + + while Date() < deadline { + let count = read(fd, &readBuf, readBuf.count) + if count > 0 { + buffer.append(contentsOf: readBuf[0..= headerRange.upperBound + contentLength { + let body = buffer[headerRange.upperBound..<(headerRange.upperBound + contentLength)] + return try JSONSerialization.jsonObject(with: body) as? [String: Any] ?? [:] + } + } else if count == 0 || (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) { + break + } + Thread.sleep(forTimeInterval: 0.01) + } + + throw NSError(domain: "BrainBarReliabilityTests", code: 2, userInfo: [ + NSLocalizedDescriptionKey: "Timed out waiting for MCP response" + ]) +}