Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 123 additions & 2 deletions backend/src/api/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,24 @@ import { type Networks } from '../lishnet/lishnets.ts';
import { type Settings } from '../settings.ts';
import { lishTopic } from '../protocol/constants.ts';
import { trace } from '../logger.ts';
import { registerSearchResultHandler, unregisterSearchResultHandler, type SearchResultAnnouncement } from '../protocol/lish-protocol.ts';
import { LISH_PROTOCOL, LISHClient, registerSearchResultHandler, unregisterSearchResultHandler, type SearchResultAnnouncement } from '../protocol/lish-protocol.ts';
import type { LishSearchResult } from '@shared';

/**
* Concurrency cap for the unicast `getLishs` fallback. Each fan-out opens a
* fresh LISH protocol stream per peer; on large fleets a uncapped loop would
* burst dozens of dials at once. 10 is a balance between LAN search latency
* (sub-second for typical 5-30 peer fleets) and load on the libp2p dialer.
*/
const UNICAST_FALLBACK_PARALLEL = 10;
/**
* Already-queried peer set lives on the session so the initial snapshot
* dispatch and the live `peer:connect` listener can deduplicate against
* each other without re-asking the same peer twice. handleResult's own
* dedup catches the response side; this avoids the wasted dial.
*/
type Queried = Set<string>;

type BroadcastFn = (event: string, data: any) => void;

interface SearchSession {
Expand All @@ -15,6 +30,10 @@ interface SearchSession {
timeout: ReturnType<typeof setTimeout>;
/** Aggregated results, keyed by LISH id. New responders for the same LISH push into `peers`. */
results: Map<string, LishSearchResult>;
/** Peers we have already dispatched a unicast getLishs to. */
queried: Queried;
/** Disposer for the `peer:connect` listener, called on timeout/cancel. */
disposePeerConnect: () => void;
}

export interface SearchManager {
Expand Down Expand Up @@ -43,6 +62,7 @@ export function initSearchManager(networks: Networks, settings: Settings, broadc
const session = sessions.get(searchID);
if (!session) return;
clearTimeout(session.timeout);
session.disposePeerConnect();
unregisterSearchResultHandler(searchID);
sessions.delete(searchID);
broadcast('search:lishs:complete', { searchID, reason });
Expand Down Expand Up @@ -98,18 +118,36 @@ export function initSearchManager(networks: Networks, settings: Settings, broadc
if (query.length === 0) throw new Error('search query is empty');
const searchID = randomUUID();
const timeoutMs = settings.get('network.searchTimeout') ?? 30_000;
const network = networks.getRunningNetwork();
const selfPeerID = network.getNodeInfo()?.peerID ?? '';
const queried: Queried = new Set();
// Live listener: every peer that completes a libp2p connection while
// this search is in flight gets a unicast `getLishs(query)`. Catches
// the case where a peer appears via mDNS / peer-announce / hole-punch
// AFTER the user clicked Search but BEFORE the timeout fires, so the
// result shows up without the user having to retry.
const disposePeerConnect = network.onPeerConnect(peerID => {
if (!sessions.has(searchID)) return;
if (!peerID || peerID === selfPeerID) return;
if (queried.has(peerID)) return;
queried.add(peerID);
void queryOnePeer(searchID, query, peerID).catch(() => {
/* logged inside queryOnePeer */
});
});
const session: SearchSession = {
searchID,
query,
startedAt: Date.now(),
results: new Map(),
timeout: setTimeout(() => endSession(searchID, 'timeout'), timeoutMs),
queried,
disposePeerConnect,
};
sessions.set(searchID, session);
registerSearchResultHandler(searchID, handleResult);
// Broadcast the query on every joined network topic. If broadcast fails on a particular
// topic, log and continue — the search is still useful on other networks.
const network = networks.getRunningNetwork();
const message = { type: 'searchLishs', searchID, query };
for (const config of networks.list()) {
if (!config.enabled || !networks.isJoined(config.networkID)) continue;
Expand All @@ -119,9 +157,92 @@ export function initSearchManager(networks: Networks, settings: Settings, broadc
console.warn(`[Search] broadcast on ${config.networkID.slice(0, 8)} failed: ${err?.message ?? err}`);
}
}
// Kick off the unicast fallback in parallel with the pubsub broadcast.
// The fallback covers two windows the pubsub path leaves open:
// 1. Peer subscribed but skipped by floodPublish (NaN score, dead
// RPC stream, sparse mesh) — `getPeers()` returns them too.
// 2. Peer connected at the libp2p layer but the gossipsub SUBSCRIBE
// RPC has not yet propagated. floodPublish only iterates
// `pubsub.getSubscribers(topic)` so these peers silently miss the
// query; the unicast dial reaches them the moment the connection
// is up, independent of gossipsub state.
runUnicastFallback(session).catch(err => trace(`[Search] unicast fallback ${searchID.slice(0, 8)} failed: ${err?.message ?? err}`));
return { searchID };
}

/**
* Initial unicast fan-out at search start. Queries the union of every
* libp2p-connected peer (across all networks; we don't try to map peers
* to lishnets here — the server-side `isUploadAdvertisable` guard plus
* the optional query filter handle that on the responder). Live peers
* that connect AFTER this snapshot are picked up by the
* `peer:connect` listener installed in `startSearch`.
*/
async function runUnicastFallback(session: SearchSession): Promise<void> {
const { searchID, query, queried } = session;
const network = networks.getRunningNetwork();
const selfPeerID = network.getNodeInfo()?.peerID ?? '';
// `getPeers()` is the libp2p-connection peer set, NOT the gossipsub
// subscriber set. Includes peers freshly dialed via mDNS for whom
// gossipsub SUBSCRIBE has not yet completed — exactly the case the
// fallback exists to fix.
const peerList = network.getPeers().filter(p => p && p !== selfPeerID && !queried.has(p));
for (const p of peerList) queried.add(p);
if (peerList.length === 0) {
trace(`[Search] unicast fallback ${searchID.slice(0, 8)}: no connected peers in snapshot`);
return;
}
trace(`[Search] unicast fallback ${searchID.slice(0, 8)}: snapshot dispatching to ${peerList.length} peer(s)`);
let cursor = 0;
const workerCount = Math.min(UNICAST_FALLBACK_PARALLEL, peerList.length);
const workers = Array.from({ length: workerCount }, async () => {
for (;;) {
// Bail immediately if the session has been cancelled or timed
// out — no point opening a stream for results we will discard.
if (!sessions.has(searchID)) return;
const idx = cursor++;
if (idx >= peerList.length) return;
await queryOnePeer(searchID, query, peerList[idx]!);
}
});
await Promise.allSettled(workers);
}

async function queryOnePeer(searchID: string, query: string, peerID: string): Promise<void> {
if (!sessions.has(searchID)) return;
const network = networks.getRunningNetwork();
let client: LISHClient | undefined;
try {
const { stream } = await network.dialProtocolByPeerId(peerID, LISH_PROTOCOL);
client = new LISHClient(stream);
const lishs = await client.requestList(query);
if (!sessions.has(searchID)) return;
// Defense-in-depth: peers running an older version silently ignore
// the `query` field in our getLishs request and respond with their
// FULL advertised list. Without a client-side filter that would
// produce false-positive matches in the UI. Apply the same
// case-insensitive substring rule used by the server-side filter
// (network.ts:handleSearchLishs) so old peers behave identically
// to upgraded ones from the caller's perspective.
const q = query.toLowerCase();
const matches = lishs.filter(l => {
if (typeof l.id !== 'string') return false;
if (l.id.toLowerCase().includes(q)) return true;
return (l.name?.toLowerCase() ?? '').includes(q);
});
if (matches.length > 0) {
// Re-use the same aggregation/dedup path as the pubsub-driven
// responses, so a peer reachable through both channels never
// produces a duplicate row in the FE result list.
handleResult({ searchID, peerID, lishs: matches });
}
} catch (err: any) {
trace(`[Search] unicast getLishs to ${peerID.slice(0, 12)} failed: ${err?.message ?? err}`);
} finally {
await client?.close().catch(() => {});
}
}

function cancelSearch(p: { searchID: string }): { ok: true } {
endSession(p.searchID, 'cancel');
return { ok: true };
Expand Down
26 changes: 22 additions & 4 deletions backend/src/protocol/lish-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ export interface LISHGetLishRequest {
}
export interface LISHGetLishsRequest {
type: 'getLishs';
/**
* Optional case-insensitive substring filter applied server-side to each
* LISH's id and (if present) name. When omitted the responder returns its
* full advertised list — backward-compatible with peers that predate this
* field. Used by the unicast fallback in `api/search.ts` so a search can
* succeed even when the gossipsub subscriber set has not yet propagated
* to a freshly-discovered peer (e.g. one just dialed via mDNS).
*/
query?: string;
}
/**
* Unicast "I have this LISH" announcement — response to a pubsub `want`.
Expand Down Expand Up @@ -154,9 +163,11 @@ export class LISHClient {
return response.manifest;
}

// Request list of shared LISHs from peer
async requestList(): Promise<Array<{ id: string; name?: string; totalSize?: number }>> {
const request: LISHGetLishsRequest = { type: 'getLishs' };
// Request list of shared LISHs from peer. `query` is an optional
// case-insensitive substring filter the peer applies server-side; omit it
// to retrieve the full list.
async requestList(query?: string): Promise<Array<{ id: string; name?: string; totalSize?: number }>> {
const request: LISHGetLishsRequest = { type: 'getLishs', ...(query !== undefined ? { query } : {}) };
if (!sendLengthPrefixed(this.stream, codecEncode(request))) {
throw new CodedError(ErrorCodes.PEER_UNREACHABLE, `getLishs: stream ${this.stream.status}`);
}
Expand Down Expand Up @@ -364,7 +375,14 @@ export async function handleLISHProtocol(stream: Stream, dataServer: DataServer,
// Return list of all shared (upload_enabled) LISHs — id and name only.
// Newest first — matches the order shown locally in "Download and Sharing".
const allLishs = dataServer.list();
const shared = allLishs.filter(l => isUploadAdvertisable(l.id)).reverse();
const q = typeof request.query === 'string' && request.query.length > 0 ? request.query.toLowerCase() : null;
const matches = (l: import('@shared').IStoredLISH): boolean => {
if (!q) return true;
if (l.id.toLowerCase().includes(q)) return true;
const name = l.name?.toLowerCase() ?? '';
return name.includes(q);
};
const shared = allLishs.filter(l => isUploadAdvertisable(l.id) && matches(l)).reverse();
const response: LISHGetLishsResponse = {
type: 'getLishs-result',
lishs: shared.map(l => {
Expand Down
34 changes: 34 additions & 0 deletions backend/src/protocol/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,40 @@ export class Network {
this.listeners.push({ target, event, handler });
}

/**
* Subscribe to libp2p `peer:connect` events for the duration of the
* returned disposer. The handler receives the peer ID as a string.
*
* Unlike the private `addListener`, this is intended for short-lived
* subscriptions tied to a specific operation (e.g. an in-flight LISH
* search session) — the disposer removes the listener from the global
* tracked-listener list so it does not leak across sessions. If the
* network is stopped before the caller disposes, the listener is still
* cleaned up via the normal {@link stop} path.
*/
onPeerConnect(handler: (peerID: string) => void): () => void {
if (!this.node) return () => {};
const node = this.node;
const listener = (evt: any): void => {
const pid = evt.detail?.toString?.();
if (pid) handler(pid);
};
this.addListener(node, 'peer:connect', listener);
let disposed = false;
return () => {
if (disposed) return;
disposed = true;
try {
node.removeEventListener('peer:connect', listener as any);
} catch {
// Node may already be stopped — fine, stop() walked the tracked
// list already.
}
const idx = this.listeners.findIndex(l => l.target === node && l.event === 'peer:connect' && l.handler === listener);
if (idx >= 0) this.listeners.splice(idx, 1);
};
}

/**
* Schedule a debounced check of peer counts for all subscribed topics.
*/
Expand Down
7 changes: 6 additions & 1 deletion backend/tests/unit/protocol/search-visibility.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ describe('LISH search visibility', () => {
});

it('uses the same advertisable guard for direct getLishs and getLish protocol requests', () => {
expect(LISH_PROTOCOL_TS).toContain('filter(l => isUploadAdvertisable(l.id))');
// The getLishs handler may layer additional predicates (e.g. an
// optional query filter for the unicast-search fallback), so assert
// the guard is present in the filter chain rather than matching an
// exact substring that would break on every new predicate.
const getLishsBlock = LISH_PROTOCOL_TS.slice(LISH_PROTOCOL_TS.indexOf("request.type === 'getLishs'"), LISH_PROTOCOL_TS.indexOf("request.type === 'getLish'"));
expect(getLishsBlock).toContain('isUploadAdvertisable(l.id)');
expect(LISH_PROTOCOL_TS).toContain('if (!isUploadAdvertisable(request.lishID))');
});

Expand Down