Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 91 additions & 4 deletions Sources/SYM/SymNode.swift
Original file line number Diff line number Diff line change
Expand Up @@ -942,20 +942,100 @@ public final class SymNode {

// MARK: - Multi-Transport Peer Management (MMP Section 4.6)

/// Deterministic tie-break for simultaneous-dial collisions. Both peers
/// observe the same `(localNodeId, remoteNodeId)` pair (with the local
/// and remote roles swapped on each side) and call this function with
/// their respective `newIsOutbound` value. The function picks the same
/// *physical* TCP connection on both sides — the outbound from the
/// lower nodeId, which is the inbound on the higher nodeId — so neither
/// peer needs to coordinate via a frame exchange.
///
/// Returns `true` if the new session should replace the prior one,
/// `false` if the prior should be kept and the new session disconnected.
/// Internal-access for `@testable` unit tests.
static func preferNewSessionInDualDial(
localNodeId: String,
remoteNodeId: String,
newIsOutbound: Bool
) -> Bool {
let localIsClient = localNodeId < remoteNodeId
let keepOutbound = localIsClient
return (keepOutbound == newIsOutbound)
}

/// Outcome of the addPeer state-machine. Threading note: the dict
/// mutation happens inside `peerQueue.sync`; the cleanup actions
/// (cancelling losers, sending frames, emitting events) happen after
/// the lock is released so we don't hold `peerQueue` across I/O.
private enum AddPeerOutcome {
case firstTime // never seen before — new peer-joined
case secondaryTransport // existing peer, no prior bonjour
case replacedPriorBonjour(prev: SymPeerSession) // simultaneous dial: new wins
case rejectedNew // simultaneous dial: existing wins
}

private func addPeer(_ session: SymPeerSession, nodeId: String, peerName: String, isOutbound: Bool) {
let isNew = peerQueue.sync { () -> Bool in
let outcome: AddPeerOutcome = peerQueue.sync {
if var existing = self.peers[nodeId] {
// Section 4.6: add Bonjour as secondary transport
if let prev = existing.transports["bonjour"] ?? nil {
// Simultaneous-dial collision: both peers Bonjour-discovered
// each other within ~50ms and both initiated outbound TCP. Each
// side now holds two handshaked sessions for the same nodeId
// (one outbound, one inbound). Without explicit dedup the new
// session silently overwrites the prior in the transports dict
// — the orphan keeps a live NWConnection + read loop, eventually
// fires didDisconnectWith, and `removeTransport` strips the
// surviving winner from the dict. Net effect: every connection
// dies within ~1–2s of handshake, application payloads never
// cross the wire.
//
// Tie-break deterministically by nodeId so both peers agree on
// the same physical connection without exchanging frames: the
// lower nodeId acts as client and keeps its outbound; the
// higher node keeps the matching inbound. The loser's delegate
// is detached before disconnect (see fall-through below) so its
// teardown can't ripple back through removeTransport.
let preferNew = SymNode.preferNewSessionInDualDial(
localNodeId: self.identity.nodeId,
remoteNodeId: nodeId,
newIsOutbound: isOutbound
)
if preferNew {
existing.transports["bonjour"] = session
existing.lastSeen = Date()
self.peers[nodeId] = existing
return .replacedPriorBonjour(prev: prev)
}
return .rejectedNew
}
// Section 4.6: add Bonjour as secondary transport on top of relay
existing.transports["bonjour"] = session
existing.lastSeen = Date()
self.peers[nodeId] = existing
return false
return .secondaryTransport
}
self.peers[nodeId] = PeerState(
transports: ["bonjour": session], name: peerName,
isOutbound: isOutbound, lastSeen: Date()
)
return true
return .firstTime
}

switch outcome {
case .rejectedNew:
// Detach the delegate so this session's eventual cancellation does NOT
// fire didDisconnectWith → removeTransport, which would strip the
// still-registered winner from the transports dict.
logger.info("[SYM] peer: simultaneous-dial dedup — keeping prior, cancelling redundant \(isOutbound ? "outbound" : "inbound") to \(peerName)")
session.delegate = nil
session.disconnect()
return
case .replacedPriorBonjour(let prev):
logger.info("[SYM] peer: simultaneous-dial dedup — replacing prior with new \(isOutbound ? "outbound" : "inbound") to \(peerName)")
prev.delegate = nil
prev.disconnect()
case .secondaryTransport, .firstTime:
break
}

// Handshake was already sent in sessionDidBecomeReady (on TCP connect).
Expand All @@ -964,6 +1044,13 @@ public final class SymNode {
session.send(.wakeChannel(platform: wc.platform, token: wc.token, environment: wc.environment))
}

let isNew: Bool
switch outcome {
case .firstTime: isNew = true
case .secondaryTransport, .replacedPriorBonjour: isNew = false
case .rejectedNew: return // unreachable, returned above
}

if isNew {
let knownPeers = buildPeerGossip(excluding: nodeId)
if !knownPeers.isEmpty { session.send(.peerInfo(peers: knownPeers)) }
Expand Down
84 changes: 84 additions & 0 deletions Tests/SYMTests/SYMTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -450,3 +450,87 @@ final class NodeTests: XCTestCase {
XCTAssertEqual(a.nodeId, b.nodeId, "same name should produce same nodeId")
}
}

// MARK: - Simultaneous-Dial Dedup Tests
//
// Both peers Bonjour-discover each other within ~50ms on a LAN and both
// initiate outbound TCP. The handshakes complete on both sides, so each
// node holds two sessions for the same nodeId (one outbound, one inbound).
// `preferNewSessionInDualDial` is the tie-break that lets each peer
// independently agree on which physical TCP connection survives, without
// exchanging coordination frames.
//
// The critical correctness property: for the same (A, B) pair, peer A's
// decision and peer B's decision must select the SAME physical
// connection — A's outbound is also B's inbound, so if A picks "keep my
// outbound," B must pick "keep my inbound" for that same socket pair.
//
// If both peers picked their own outbound, both would tear down the
// same socket pair from opposite ends and the connection would die.
// If both peers picked their own inbound, the symmetric problem occurs.

final class SimultaneousDialDedupTests: XCTestCase {

/// Both peers MUST agree on the same physical connection. Models the
/// dual-dial scenario for an arbitrary (A, B) pair: A's outbound is
/// the same TCP socket as B's inbound.
func testBothPeersPickSamePhysicalConnection() {
let pairs: [(String, String)] = [
("aaaa", "bbbb"), // ascii lower
("019dd87c", "019dd87d"), // realistic uuid7-prefix neighbors
("zzzz", "aaaa"), // reverse ordered
("00000000-0000-7000-8000-000000000001",
"00000000-0000-7000-8000-000000000002"), // full uuids
]

for (a, b) in pairs {
// From A's perspective: A's outbound is the same socket as B's inbound.
let aPrefersOwnOutbound = SymNode.preferNewSessionInDualDial(
localNodeId: a, remoteNodeId: b, newIsOutbound: true
)
let aPrefersOwnInbound = SymNode.preferNewSessionInDualDial(
localNodeId: a, remoteNodeId: b, newIsOutbound: false
)
// From B's perspective: B's inbound is A's outbound; B's outbound is A's inbound.
let bPrefersOwnOutbound = SymNode.preferNewSessionInDualDial(
localNodeId: b, remoteNodeId: a, newIsOutbound: true
)
let bPrefersOwnInbound = SymNode.preferNewSessionInDualDial(
localNodeId: b, remoteNodeId: a, newIsOutbound: false
)

// Tie-break is total: each peer prefers exactly one direction.
XCTAssertNotEqual(aPrefersOwnOutbound, aPrefersOwnInbound,
"A's outbound vs inbound preference must be opposite for pair (\(a), \(b))")
XCTAssertNotEqual(bPrefersOwnOutbound, bPrefersOwnInbound,
"B's outbound vs inbound preference must be opposite for pair (\(a), \(b))")

// A's outbound socket = B's inbound socket. If A keeps its outbound,
// B must keep its inbound (same socket); otherwise both peers tear
// down the same socket pair from opposite ends.
XCTAssertEqual(aPrefersOwnOutbound, bPrefersOwnInbound,
"A's outbound and B's inbound are the same socket — both peers must pick consistently for pair (\(a), \(b))")
XCTAssertEqual(aPrefersOwnInbound, bPrefersOwnOutbound,
"A's inbound and B's outbound are the same socket — both peers must pick consistently for pair (\(a), \(b))")
}
}

/// The lower-nodeId-acts-as-client convention: the lower nodeId keeps
/// its outbound. Anchors the algorithm so future refactors don't
/// silently flip the convention (which would still be correct under
/// the symmetry test above, but would interop-break against any other
/// implementation that follows MMP convention).
func testLowerNodeIdKeepsOutbound() {
// a < b, so A is "client" and keeps its outbound.
let a = "00000000-0000-7000-8000-000000000001"
let b = "00000000-0000-7000-8000-000000000002"
XCTAssertTrue(SymNode.preferNewSessionInDualDial(localNodeId: a, remoteNodeId: b, newIsOutbound: true),
"lower nodeId should keep its outbound")
XCTAssertFalse(SymNode.preferNewSessionInDualDial(localNodeId: a, remoteNodeId: b, newIsOutbound: false),
"lower nodeId should reject its inbound")
XCTAssertFalse(SymNode.preferNewSessionInDualDial(localNodeId: b, remoteNodeId: a, newIsOutbound: true),
"higher nodeId should reject its outbound")
XCTAssertTrue(SymNode.preferNewSessionInDualDial(localNodeId: b, remoteNodeId: a, newIsOutbound: false),
"higher nodeId should keep its inbound")
}
}