From b8bd5d917a9b2179679cc678194ac43869ad549f Mon Sep 17 00:00:00 2001 From: LuRy Date: Tue, 12 May 2026 15:33:06 +0200 Subject: [PATCH 1/6] refactor(protocol): add optional query filter to getLishs unicast request --- backend/src/protocol/lish-protocol.ts | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/backend/src/protocol/lish-protocol.ts b/backend/src/protocol/lish-protocol.ts index 01b7a56a..8d64fa20 100644 --- a/backend/src/protocol/lish-protocol.ts +++ b/backend/src/protocol/lish-protocol.ts @@ -38,6 +38,15 @@ export interface LISHGetLishRequest { } export interface LISHGetLishsRequest { type: 'getLishs'; + /** + * Optional case-insensitive substring filter applied server-side to each + * LISH's id and (if present) name. When omitted the responder returns its + * full advertised list — backward-compatible with peers that predate this + * field. Used by the unicast fallback in `api/search.ts` so a search can + * succeed even when the gossipsub subscriber set has not yet propagated + * to a freshly-discovered peer (e.g. one just dialed via mDNS). + */ + query?: string; } /** * Unicast "I have this LISH" announcement — response to a pubsub `want`. @@ -154,9 +163,11 @@ export class LISHClient { return response.manifest; } - // Request list of shared LISHs from peer - async requestList(): Promise> { - const request: LISHGetLishsRequest = { type: 'getLishs' }; + // Request list of shared LISHs from peer. `query` is an optional + // case-insensitive substring filter the peer applies server-side; omit it + // to retrieve the full list. + async requestList(query?: string): Promise> { + const request: LISHGetLishsRequest = { type: 'getLishs', ...(query !== undefined ? { query } : {}) }; if (!sendLengthPrefixed(this.stream, codecEncode(request))) { throw new CodedError(ErrorCodes.PEER_UNREACHABLE, `getLishs: stream ${this.stream.status}`); } @@ -364,7 +375,14 @@ export async function handleLISHProtocol(stream: Stream, dataServer: DataServer, // Return list of all shared (upload_enabled) LISHs — id and name only. // Newest first — matches the order shown locally in "Download and Sharing". const allLishs = dataServer.list(); - const shared = allLishs.filter(l => isUploadAdvertisable(l.id)).reverse(); + const q = typeof request.query === 'string' && request.query.length > 0 ? request.query.toLowerCase() : null; + const matches = (l: import('@shared').IStoredLISH): boolean => { + if (!q) return true; + if (l.id.toLowerCase().includes(q)) return true; + const name = l.name?.toLowerCase() ?? ''; + return name.includes(q); + }; + const shared = allLishs.filter(l => isUploadAdvertisable(l.id) && matches(l)).reverse(); const response: LISHGetLishsResponse = { type: 'getLishs-result', lishs: shared.map(l => { From 856e7ca49169025158029cd01438a2253d6af18d Mon Sep 17 00:00:00 2001 From: LuRy Date: Tue, 12 May 2026 15:34:14 +0200 Subject: [PATCH 2/6] feat(search): unicast fallback to connected topic peers in parallel --- backend/src/api/search.ts | 81 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 80 insertions(+), 1 deletion(-) diff --git a/backend/src/api/search.ts b/backend/src/api/search.ts index 744be7c4..0fa23a55 100644 --- a/backend/src/api/search.ts +++ b/backend/src/api/search.ts @@ -3,9 +3,17 @@ import { type Networks } from '../lishnet/lishnets.ts'; import { type Settings } from '../settings.ts'; import { lishTopic } from '../protocol/constants.ts'; import { trace } from '../logger.ts'; -import { registerSearchResultHandler, unregisterSearchResultHandler, type SearchResultAnnouncement } from '../protocol/lish-protocol.ts'; +import { LISH_PROTOCOL, LISHClient, registerSearchResultHandler, unregisterSearchResultHandler, type SearchResultAnnouncement } from '../protocol/lish-protocol.ts'; import type { LishSearchResult } from '@shared'; +/** + * Concurrency cap for the unicast `getLishs` fallback. Each fan-out opens a + * fresh LISH protocol stream per peer; on large fleets a uncapped loop would + * burst dozens of dials at once. 10 is a balance between LAN search latency + * (sub-second for typical 5-30 peer fleets) and load on the libp2p dialer. + */ +const UNICAST_FALLBACK_PARALLEL = 10; + type BroadcastFn = (event: string, data: any) => void; interface SearchSession { @@ -119,9 +127,80 @@ export function initSearchManager(networks: Networks, settings: Settings, broadc console.warn(`[Search] broadcast on ${config.networkID.slice(0, 8)} failed: ${err?.message ?? err}`); } } + // Kick off the unicast fallback in parallel with the pubsub broadcast. + // floodPublish only reaches peers already in pubsub.getSubscribers(topic) + // AND scored above publishThreshold — a freshly-discovered peer (mDNS / + // peer-announce / bootstrap dial) typically has a 100-500 ms window + // after dial completes before its SUBSCRIBE RPC propagates back to us, + // during which floodPublish silently skips them. Dialing them directly + // via the LISH protocol bypasses gossipsub state entirely, so the + // search works the instant the libp2p connection is up. Fire-and-forget: + // rejections are logged but never bubble up into the FE response. + runUnicastFallback(searchID, query).catch(err => trace(`[Search] unicast fallback ${searchID.slice(0, 8)} failed: ${err?.message ?? err}`)); return { searchID }; } + /** + * Per-search unicast fan-out. Collects the union of topic-subscribed + * peers across every joined network, removes our own peerID, and sends a + * `getLishs(query)` request on a freshly-opened LISH protocol stream to + * each. Successful responses are routed through {@link handleResult}, + * which dedupes peer-id collisions against any reply we may already have + * received via the pubsub `searchResult` path. Bounded concurrency via a + * cursor-based worker pool — see {@link UNICAST_FALLBACK_PARALLEL}. + */ + async function runUnicastFallback(searchID: string, query: string): Promise { + const network = networks.getRunningNetwork(); + const selfPeerID = network.getNodeInfo()?.peerID; + const peers = new Set(); + for (const config of networks.list()) { + if (!config.enabled || !networks.isJoined(config.networkID)) continue; + for (const p of network.getTopicPeers(config.networkID)) { + if (p && p !== selfPeerID) peers.add(p); + } + } + if (peers.size === 0) { + trace(`[Search] unicast fallback ${searchID.slice(0, 8)}: no connected topic peers, skipping`); + return; + } + const peerList = [...peers]; + trace(`[Search] unicast fallback ${searchID.slice(0, 8)}: dispatching to ${peerList.length} peer(s)`); + let cursor = 0; + const workerCount = Math.min(UNICAST_FALLBACK_PARALLEL, peerList.length); + const workers = Array.from({ length: workerCount }, async () => { + for (;;) { + // Bail immediately if the session has been cancelled or timed + // out — no point opening a stream for results we will discard. + if (!sessions.has(searchID)) return; + const idx = cursor++; + if (idx >= peerList.length) return; + await queryOnePeer(searchID, query, peerList[idx]!); + } + }); + await Promise.allSettled(workers); + } + + async function queryOnePeer(searchID: string, query: string, peerID: string): Promise { + if (!sessions.has(searchID)) return; + const network = networks.getRunningNetwork(); + let client: LISHClient | undefined; + try { + const { stream } = await network.dialProtocolByPeerId(peerID, LISH_PROTOCOL); + client = new LISHClient(stream); + const lishs = await client.requestList(query); + if (sessions.has(searchID) && lishs.length > 0) { + // Re-use the same aggregation/dedup path as the pubsub-driven + // responses, so a peer reachable through both channels never + // produces a duplicate row in the FE result list. + handleResult({ searchID, peerID, lishs }); + } + } catch (err: any) { + trace(`[Search] unicast getLishs to ${peerID.slice(0, 12)} failed: ${err?.message ?? err}`); + } finally { + await client?.close().catch(() => {}); + } + } + function cancelSearch(p: { searchID: string }): { ok: true } { endSession(p.searchID, 'cancel'); return { ok: true }; From 73a3475af37b448ae9cd7097f3c8e37ede93f45f Mon Sep 17 00:00:00 2001 From: LuRy Date: Tue, 12 May 2026 15:35:53 +0200 Subject: [PATCH 3/6] test(protocol): relax getLishs guard assertion to allow additional predicates --- backend/tests/unit/protocol/search-visibility.test.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/backend/tests/unit/protocol/search-visibility.test.ts b/backend/tests/unit/protocol/search-visibility.test.ts index 0368bad4..9ec6b9ba 100644 --- a/backend/tests/unit/protocol/search-visibility.test.ts +++ b/backend/tests/unit/protocol/search-visibility.test.ts @@ -41,7 +41,12 @@ describe('LISH search visibility', () => { }); it('uses the same advertisable guard for direct getLishs and getLish protocol requests', () => { - expect(LISH_PROTOCOL_TS).toContain('filter(l => isUploadAdvertisable(l.id))'); + // The getLishs handler may layer additional predicates (e.g. an + // optional query filter for the unicast-search fallback), so assert + // the guard is present in the filter chain rather than matching an + // exact substring that would break on every new predicate. + const getLishsBlock = LISH_PROTOCOL_TS.slice(LISH_PROTOCOL_TS.indexOf("request.type === 'getLishs'"), LISH_PROTOCOL_TS.indexOf("request.type === 'getLish'")); + expect(getLishsBlock).toContain('isUploadAdvertisable(l.id)'); expect(LISH_PROTOCOL_TS).toContain('if (!isUploadAdvertisable(request.lishID))'); }); From f4092c666fec7d0de26d6c782d9e0ea33ce08ed3 Mon Sep 17 00:00:00 2001 From: LuRy Date: Tue, 12 May 2026 20:40:07 +0200 Subject: [PATCH 4/6] feat(network): add onPeerConnect helper with auto-disposing listener --- backend/src/protocol/network.ts | 34 +++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/backend/src/protocol/network.ts b/backend/src/protocol/network.ts index 7866a841..85049880 100644 --- a/backend/src/protocol/network.ts +++ b/backend/src/protocol/network.ts @@ -370,6 +370,40 @@ export class Network { this.listeners.push({ target, event, handler }); } + /** + * Subscribe to libp2p `peer:connect` events for the duration of the + * returned disposer. The handler receives the peer ID as a string. + * + * Unlike the private `addListener`, this is intended for short-lived + * subscriptions tied to a specific operation (e.g. an in-flight LISH + * search session) — the disposer removes the listener from the global + * tracked-listener list so it does not leak across sessions. If the + * network is stopped before the caller disposes, the listener is still + * cleaned up via the normal {@link stop} path. + */ + onPeerConnect(handler: (peerID: string) => void): () => void { + if (!this.node) return () => {}; + const node = this.node; + const listener = (evt: any): void => { + const pid = evt.detail?.toString?.(); + if (pid) handler(pid); + }; + this.addListener(node, 'peer:connect', listener); + let disposed = false; + return () => { + if (disposed) return; + disposed = true; + try { + node.removeEventListener('peer:connect', listener as any); + } catch { + // Node may already be stopped — fine, stop() walked the tracked + // list already. + } + const idx = this.listeners.findIndex(l => l.target === node && l.event === 'peer:connect' && l.handler === listener); + if (idx >= 0) this.listeners.splice(idx, 1); + }; + } + /** * Schedule a debounced check of peer counts for all subscribed topics. */ From d31e90f88063e220832f52d8d859c33c5a61f6e0 Mon Sep 17 00:00:00 2001 From: LuRy Date: Tue, 12 May 2026 20:41:16 +0200 Subject: [PATCH 5/6] feat(search): broaden unicast fallback to all libp2p peers and react to peer:connect --- backend/src/api/search.ts | 88 ++++++++++++++++++++++++++------------- 1 file changed, 58 insertions(+), 30 deletions(-) diff --git a/backend/src/api/search.ts b/backend/src/api/search.ts index 0fa23a55..8a968feb 100644 --- a/backend/src/api/search.ts +++ b/backend/src/api/search.ts @@ -13,6 +13,13 @@ import type { LishSearchResult } from '@shared'; * (sub-second for typical 5-30 peer fleets) and load on the libp2p dialer. */ const UNICAST_FALLBACK_PARALLEL = 10; +/** + * Already-queried peer set lives on the session so the initial snapshot + * dispatch and the live `peer:connect` listener can deduplicate against + * each other without re-asking the same peer twice. handleResult's own + * dedup catches the response side; this avoids the wasted dial. + */ +type Queried = Set; type BroadcastFn = (event: string, data: any) => void; @@ -23,6 +30,10 @@ interface SearchSession { timeout: ReturnType; /** Aggregated results, keyed by LISH id. New responders for the same LISH push into `peers`. */ results: Map; + /** Peers we have already dispatched a unicast getLishs to. */ + queried: Queried; + /** Disposer for the `peer:connect` listener, called on timeout/cancel. */ + disposePeerConnect: () => void; } export interface SearchManager { @@ -51,6 +62,7 @@ export function initSearchManager(networks: Networks, settings: Settings, broadc const session = sessions.get(searchID); if (!session) return; clearTimeout(session.timeout); + session.disposePeerConnect(); unregisterSearchResultHandler(searchID); sessions.delete(searchID); broadcast('search:lishs:complete', { searchID, reason }); @@ -106,18 +118,36 @@ export function initSearchManager(networks: Networks, settings: Settings, broadc if (query.length === 0) throw new Error('search query is empty'); const searchID = randomUUID(); const timeoutMs = settings.get('network.searchTimeout') ?? 30_000; + const network = networks.getRunningNetwork(); + const selfPeerID = network.getNodeInfo()?.peerID ?? ''; + const queried: Queried = new Set(); + // Live listener: every peer that completes a libp2p connection while + // this search is in flight gets a unicast `getLishs(query)`. Catches + // the case where a peer appears via mDNS / peer-announce / hole-punch + // AFTER the user clicked Search but BEFORE the timeout fires, so the + // result shows up without the user having to retry. + const disposePeerConnect = network.onPeerConnect(peerID => { + if (!sessions.has(searchID)) return; + if (!peerID || peerID === selfPeerID) return; + if (queried.has(peerID)) return; + queried.add(peerID); + void queryOnePeer(searchID, query, peerID).catch(() => { + /* logged inside queryOnePeer */ + }); + }); const session: SearchSession = { searchID, query, startedAt: Date.now(), results: new Map(), timeout: setTimeout(() => endSession(searchID, 'timeout'), timeoutMs), + queried, + disposePeerConnect, }; sessions.set(searchID, session); registerSearchResultHandler(searchID, handleResult); // Broadcast the query on every joined network topic. If broadcast fails on a particular // topic, log and continue — the search is still useful on other networks. - const network = networks.getRunningNetwork(); const message = { type: 'searchLishs', searchID, query }; for (const config of networks.list()) { if (!config.enabled || !networks.isJoined(config.networkID)) continue; @@ -128,43 +158,41 @@ export function initSearchManager(networks: Networks, settings: Settings, broadc } } // Kick off the unicast fallback in parallel with the pubsub broadcast. - // floodPublish only reaches peers already in pubsub.getSubscribers(topic) - // AND scored above publishThreshold — a freshly-discovered peer (mDNS / - // peer-announce / bootstrap dial) typically has a 100-500 ms window - // after dial completes before its SUBSCRIBE RPC propagates back to us, - // during which floodPublish silently skips them. Dialing them directly - // via the LISH protocol bypasses gossipsub state entirely, so the - // search works the instant the libp2p connection is up. Fire-and-forget: - // rejections are logged but never bubble up into the FE response. - runUnicastFallback(searchID, query).catch(err => trace(`[Search] unicast fallback ${searchID.slice(0, 8)} failed: ${err?.message ?? err}`)); + // The fallback covers two windows the pubsub path leaves open: + // 1. Peer subscribed but skipped by floodPublish (NaN score, dead + // RPC stream, sparse mesh) — `getPeers()` returns them too. + // 2. Peer connected at the libp2p layer but the gossipsub SUBSCRIBE + // RPC has not yet propagated. floodPublish only iterates + // `pubsub.getSubscribers(topic)` so these peers silently miss the + // query; the unicast dial reaches them the moment the connection + // is up, independent of gossipsub state. + runUnicastFallback(session).catch(err => trace(`[Search] unicast fallback ${searchID.slice(0, 8)} failed: ${err?.message ?? err}`)); return { searchID }; } /** - * Per-search unicast fan-out. Collects the union of topic-subscribed - * peers across every joined network, removes our own peerID, and sends a - * `getLishs(query)` request on a freshly-opened LISH protocol stream to - * each. Successful responses are routed through {@link handleResult}, - * which dedupes peer-id collisions against any reply we may already have - * received via the pubsub `searchResult` path. Bounded concurrency via a - * cursor-based worker pool — see {@link UNICAST_FALLBACK_PARALLEL}. + * Initial unicast fan-out at search start. Queries the union of every + * libp2p-connected peer (across all networks; we don't try to map peers + * to lishnets here — the server-side `isUploadAdvertisable` guard plus + * the optional query filter handle that on the responder). Live peers + * that connect AFTER this snapshot are picked up by the + * `peer:connect` listener installed in `startSearch`. */ - async function runUnicastFallback(searchID: string, query: string): Promise { + async function runUnicastFallback(session: SearchSession): Promise { + const { searchID, query, queried } = session; const network = networks.getRunningNetwork(); - const selfPeerID = network.getNodeInfo()?.peerID; - const peers = new Set(); - for (const config of networks.list()) { - if (!config.enabled || !networks.isJoined(config.networkID)) continue; - for (const p of network.getTopicPeers(config.networkID)) { - if (p && p !== selfPeerID) peers.add(p); - } - } - if (peers.size === 0) { - trace(`[Search] unicast fallback ${searchID.slice(0, 8)}: no connected topic peers, skipping`); + const selfPeerID = network.getNodeInfo()?.peerID ?? ''; + // `getPeers()` is the libp2p-connection peer set, NOT the gossipsub + // subscriber set. Includes peers freshly dialed via mDNS for whom + // gossipsub SUBSCRIBE has not yet completed — exactly the case the + // fallback exists to fix. + const peerList = network.getPeers().filter(p => p && p !== selfPeerID && !queried.has(p)); + for (const p of peerList) queried.add(p); + if (peerList.length === 0) { + trace(`[Search] unicast fallback ${searchID.slice(0, 8)}: no connected peers in snapshot`); return; } - const peerList = [...peers]; - trace(`[Search] unicast fallback ${searchID.slice(0, 8)}: dispatching to ${peerList.length} peer(s)`); + trace(`[Search] unicast fallback ${searchID.slice(0, 8)}: snapshot dispatching to ${peerList.length} peer(s)`); let cursor = 0; const workerCount = Math.min(UNICAST_FALLBACK_PARALLEL, peerList.length); const workers = Array.from({ length: workerCount }, async () => { From 7c878807749f60ac6db75d2b59fb1a66337228d2 Mon Sep 17 00:00:00 2001 From: LuRy Date: Tue, 12 May 2026 20:59:20 +0200 Subject: [PATCH 6/6] fix(search): client-side filter for getLishs to handle peers ignoring query field --- backend/src/api/search.ts | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/backend/src/api/search.ts b/backend/src/api/search.ts index 8a968feb..9b2e81ae 100644 --- a/backend/src/api/search.ts +++ b/backend/src/api/search.ts @@ -216,11 +216,25 @@ export function initSearchManager(networks: Networks, settings: Settings, broadc const { stream } = await network.dialProtocolByPeerId(peerID, LISH_PROTOCOL); client = new LISHClient(stream); const lishs = await client.requestList(query); - if (sessions.has(searchID) && lishs.length > 0) { + if (!sessions.has(searchID)) return; + // Defense-in-depth: peers running an older version silently ignore + // the `query` field in our getLishs request and respond with their + // FULL advertised list. Without a client-side filter that would + // produce false-positive matches in the UI. Apply the same + // case-insensitive substring rule used by the server-side filter + // (network.ts:handleSearchLishs) so old peers behave identically + // to upgraded ones from the caller's perspective. + const q = query.toLowerCase(); + const matches = lishs.filter(l => { + if (typeof l.id !== 'string') return false; + if (l.id.toLowerCase().includes(q)) return true; + return (l.name?.toLowerCase() ?? '').includes(q); + }); + if (matches.length > 0) { // Re-use the same aggregation/dedup path as the pubsub-driven // responses, so a peer reachable through both channels never // produces a duplicate row in the FE result list. - handleResult({ searchID, peerID, lishs }); + handleResult({ searchID, peerID, lishs: matches }); } } catch (err: any) { trace(`[Search] unicast getLishs to ${peerID.slice(0, 12)} failed: ${err?.message ?? err}`);