Skip to content
11 changes: 11 additions & 0 deletions backend/src/api/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ export class APIServer {
'lishnets.getNodeInfo': _lishnets.getNodeInfo,
'lishnets.getStatus': _lishnets.getStatus,
'lishnets.infoAll': _lishnets.infoAll,
'lishnets.getBootstrapStatus': _lishnets.getBootstrapStatus,
'lishnets.getAllBootstrapStatuses': _lishnets.getAllBootstrapStatuses,
'lishnets.updateBootstrapPeers': _lishnets.updateBootstrapPeers,
// Browse network — LISH search
'search.startSearch': _search.startSearch,
'search.cancelSearch': _search.cancelSearch,
Expand Down Expand Up @@ -244,6 +247,14 @@ export class APIServer {
for (const client of this.clients) this.emit(client, 'peers:count', counts);
};

// Broadcast per-network bootstrap status updates (per-peer dial outcomes).
// Clients use the lishnets:bootstrapStatus event to surface stale-config
// warnings (configured peerID does not match actual remote identity) and
// offer remediation actions in the LISH networks settings UI.
this.networks.onBootstrapStatusChange = (networkID, status) => {
this.broadcast('lishnets:bootstrapStatus', { networkID, status });
};

const protocol = this.secure ? 'wss' : 'ws';
console.log(`[API] Token authentication ${this.apiToken ? 'enabled' : 'disabled'}`);
console.log(`[API] WebSocket server listening on ${protocol}://${this.host}:${actualPort}`);
Expand Down
23 changes: 22 additions & 1 deletion backend/src/api/lishnets.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { type Networks } from '../lishnet/lishnets.ts';
import { type DataServer } from '../lish/data-server.ts';
import { type Settings } from '../settings.ts';
import { type LISHNetworkConfig, type LISHNetworkDefinition, type SuccessResponse, type NetworkNodeInfo, type NetworkStatus, type NetworkInfo, type PeerListEntry, type PeerLishEntry, type IPeerLishDetail, type ILISH, type ImportLISHResponse, type CompressionAlgorithm, CodedError, ErrorCodes } from '@shared';
import { type LISHNetworkConfig, type LISHNetworkDefinition, type SuccessResponse, type NetworkNodeInfo, type NetworkStatus, type NetworkInfo, type PeerListEntry, type PeerLishEntry, type IPeerLishDetail, type ILISH, type ImportLISHResponse, type CompressionAlgorithm, type BootstrapStatus, CodedError, ErrorCodes } from '@shared';
import { LISHClient, LISH_PROTOCOL } from '../protocol/lish-protocol.ts';
import { Utils } from '../utils.ts';
const assert = Utils.assertParams;
Expand Down Expand Up @@ -32,6 +32,9 @@ interface LISHnetsHandlers {
getNodeInfo: () => NetworkNodeInfo | null;
getStatus: (p: { networkID: string }) => NetworkStatus;
infoAll: () => NetworkInfo[];
getBootstrapStatus: (p: { networkID: string }) => BootstrapStatus | null;
getAllBootstrapStatuses: () => BootstrapStatus[];
updateBootstrapPeers: (p: { networkID: string; bootstrapPeers: string[] }) => Promise<LISHNetworkConfig>;
}
type ImportManifestFn = (lish: ILISH, downloadPath: string, opts?: { overwrite?: boolean; enableSharing?: boolean; enableDownloading?: boolean }) => Promise<ImportLISHResponse>;

Expand Down Expand Up @@ -263,6 +266,21 @@ export function initLISHnetsHandlers(networks: Networks, dataServer: DataServer,
datasets: dataServer.getDatasets().length,
};
}
function getBootstrapStatus(p: { networkID: string }): BootstrapStatus | null {
assert(p, ['networkID']);
return networks.getBootstrapStatus(p.networkID);
}
function getAllBootstrapStatuses(): BootstrapStatus[] {
return networks.getAllBootstrapStatuses();
}
async function updateBootstrapPeers(p: { networkID: string; bootstrapPeers: string[] }): Promise<LISHNetworkConfig> {
assert(p, ['networkID', 'bootstrapPeers']);
if (!Array.isArray(p.bootstrapPeers)) throw new CodedError(ErrorCodes.INVALID_INPUT_TYPE, 'bootstrapPeers must be an array');
const updated = await networks.updateBootstrapPeers(p.networkID, p.bootstrapPeers);
if (!updated) throw new CodedError(ErrorCodes.NETWORK_NOT_FOUND, p.networkID);
broadcast('lishnets:updated', { networkID: updated.networkID });
return updated;
}
function infoAll(): NetworkInfo[] {
const configs = networks.list();
const network = networks.getNetwork();
Expand Down Expand Up @@ -308,5 +326,8 @@ export function initLISHnetsHandlers(networks: Networks, dataServer: DataServer,
getNodeInfo,
getStatus,
infoAll,
getBootstrapStatus,
getAllBootstrapStatuses,
updateBootstrapPeers,
};
}
125 changes: 123 additions & 2 deletions backend/src/api/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,24 @@ import { type Networks } from '../lishnet/lishnets.ts';
import { type Settings } from '../settings.ts';
import { lishTopic } from '../protocol/constants.ts';
import { trace } from '../logger.ts';
import { registerSearchResultHandler, unregisterSearchResultHandler, type SearchResultAnnouncement } from '../protocol/lish-protocol.ts';
import { LISH_PROTOCOL, LISHClient, registerSearchResultHandler, unregisterSearchResultHandler, type SearchResultAnnouncement } from '../protocol/lish-protocol.ts';
import type { LishSearchResult } from '@shared';

/**
* Concurrency cap for the unicast `getLishs` fallback. Each fan-out opens a
* fresh LISH protocol stream per peer; on large fleets a uncapped loop would
* burst dozens of dials at once. 10 is a balance between LAN search latency
* (sub-second for typical 5-30 peer fleets) and load on the libp2p dialer.
*/
const UNICAST_FALLBACK_PARALLEL = 10;
/**
* Already-queried peer set lives on the session so the initial snapshot
* dispatch and the live `peer:connect` listener can deduplicate against
* each other without re-asking the same peer twice. handleResult's own
* dedup catches the response side; this avoids the wasted dial.
*/
type Queried = Set<string>;

type BroadcastFn = (event: string, data: any) => void;

interface SearchSession {
Expand All @@ -15,6 +30,10 @@ interface SearchSession {
timeout: ReturnType<typeof setTimeout>;
/** Aggregated results, keyed by LISH id. New responders for the same LISH push into `peers`. */
results: Map<string, LishSearchResult>;
/** Peers we have already dispatched a unicast getLishs to. */
queried: Queried;
/** Disposer for the `peer:connect` listener, called on timeout/cancel. */
disposePeerConnect: () => void;
}

export interface SearchManager {
Expand Down Expand Up @@ -43,6 +62,7 @@ export function initSearchManager(networks: Networks, settings: Settings, broadc
const session = sessions.get(searchID);
if (!session) return;
clearTimeout(session.timeout);
session.disposePeerConnect();
unregisterSearchResultHandler(searchID);
sessions.delete(searchID);
broadcast('search:lishs:complete', { searchID, reason });
Expand Down Expand Up @@ -98,18 +118,36 @@ export function initSearchManager(networks: Networks, settings: Settings, broadc
if (query.length === 0) throw new Error('search query is empty');
const searchID = randomUUID();
const timeoutMs = settings.get('network.searchTimeout') ?? 30_000;
const network = networks.getRunningNetwork();
const selfPeerID = network.getNodeInfo()?.peerID ?? '';
const queried: Queried = new Set();
// Live listener: every peer that completes a libp2p connection while
// this search is in flight gets a unicast `getLishs(query)`. Catches
// the case where a peer appears via mDNS / peer-announce / hole-punch
// AFTER the user clicked Search but BEFORE the timeout fires, so the
// result shows up without the user having to retry.
const disposePeerConnect = network.onPeerConnect(peerID => {
if (!sessions.has(searchID)) return;
if (!peerID || peerID === selfPeerID) return;
if (queried.has(peerID)) return;
queried.add(peerID);
void queryOnePeer(searchID, query, peerID).catch(() => {
/* logged inside queryOnePeer */
});
});
const session: SearchSession = {
searchID,
query,
startedAt: Date.now(),
results: new Map(),
timeout: setTimeout(() => endSession(searchID, 'timeout'), timeoutMs),
queried,
disposePeerConnect,
};
sessions.set(searchID, session);
registerSearchResultHandler(searchID, handleResult);
// Broadcast the query on every joined network topic. If broadcast fails on a particular
// topic, log and continue — the search is still useful on other networks.
const network = networks.getRunningNetwork();
const message = { type: 'searchLishs', searchID, query };
for (const config of networks.list()) {
if (!config.enabled || !networks.isJoined(config.networkID)) continue;
Expand All @@ -119,9 +157,92 @@ export function initSearchManager(networks: Networks, settings: Settings, broadc
console.warn(`[Search] broadcast on ${config.networkID.slice(0, 8)} failed: ${err?.message ?? err}`);
}
}
// Kick off the unicast fallback in parallel with the pubsub broadcast.
// The fallback covers two windows the pubsub path leaves open:
// 1. Peer subscribed but skipped by floodPublish (NaN score, dead
// RPC stream, sparse mesh) — `getPeers()` returns them too.
// 2. Peer connected at the libp2p layer but the gossipsub SUBSCRIBE
// RPC has not yet propagated. floodPublish only iterates
// `pubsub.getSubscribers(topic)` so these peers silently miss the
// query; the unicast dial reaches them the moment the connection
// is up, independent of gossipsub state.
runUnicastFallback(session).catch(err => trace(`[Search] unicast fallback ${searchID.slice(0, 8)} failed: ${err?.message ?? err}`));
return { searchID };
}

/**
* Initial unicast fan-out at search start. Queries the union of every
* libp2p-connected peer (across all networks; we don't try to map peers
* to lishnets here — the server-side `isUploadAdvertisable` guard plus
* the optional query filter handle that on the responder). Live peers
* that connect AFTER this snapshot are picked up by the
* `peer:connect` listener installed in `startSearch`.
*/
async function runUnicastFallback(session: SearchSession): Promise<void> {
const { searchID, query, queried } = session;
const network = networks.getRunningNetwork();
const selfPeerID = network.getNodeInfo()?.peerID ?? '';
// `getPeers()` is the libp2p-connection peer set, NOT the gossipsub
// subscriber set. Includes peers freshly dialed via mDNS for whom
// gossipsub SUBSCRIBE has not yet completed — exactly the case the
// fallback exists to fix.
const peerList = network.getPeers().filter(p => p && p !== selfPeerID && !queried.has(p));
for (const p of peerList) queried.add(p);
if (peerList.length === 0) {
trace(`[Search] unicast fallback ${searchID.slice(0, 8)}: no connected peers in snapshot`);
return;
}
trace(`[Search] unicast fallback ${searchID.slice(0, 8)}: snapshot dispatching to ${peerList.length} peer(s)`);
let cursor = 0;
const workerCount = Math.min(UNICAST_FALLBACK_PARALLEL, peerList.length);
const workers = Array.from({ length: workerCount }, async () => {
for (;;) {
// Bail immediately if the session has been cancelled or timed
// out — no point opening a stream for results we will discard.
if (!sessions.has(searchID)) return;
const idx = cursor++;
if (idx >= peerList.length) return;
await queryOnePeer(searchID, query, peerList[idx]!);
}
});
await Promise.allSettled(workers);
}

async function queryOnePeer(searchID: string, query: string, peerID: string): Promise<void> {
if (!sessions.has(searchID)) return;
const network = networks.getRunningNetwork();
let client: LISHClient | undefined;
try {
const { stream } = await network.dialProtocolByPeerId(peerID, LISH_PROTOCOL);
client = new LISHClient(stream);
const lishs = await client.requestList(query);
if (!sessions.has(searchID)) return;
// Defense-in-depth: peers running an older version silently ignore
// the `query` field in our getLishs request and respond with their
// FULL advertised list. Without a client-side filter that would
// produce false-positive matches in the UI. Apply the same
// case-insensitive substring rule used by the server-side filter
// (network.ts:handleSearchLishs) so old peers behave identically
// to upgraded ones from the caller's perspective.
const q = query.toLowerCase();
const matches = lishs.filter(l => {
if (typeof l.id !== 'string') return false;
if (l.id.toLowerCase().includes(q)) return true;
return (l.name?.toLowerCase() ?? '').includes(q);
});
if (matches.length > 0) {
// Re-use the same aggregation/dedup path as the pubsub-driven
// responses, so a peer reachable through both channels never
// produces a duplicate row in the FE result list.
handleResult({ searchID, peerID, lishs: matches });
}
} catch (err: any) {
trace(`[Search] unicast getLishs to ${peerID.slice(0, 12)} failed: ${err?.message ?? err}`);
} finally {
await client?.close().catch(() => {});
}
}

function cancelSearch(p: { searchID: string }): { ok: true } {
endSession(p.searchID, 'cancel');
return { ok: true };
Expand Down
81 changes: 64 additions & 17 deletions backend/src/lishnet/lishnets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Network } from '../protocol/network.ts';
import { Utils } from '../utils.ts';
import { type DataServer } from '../lish/data-server.ts';
import { type Settings } from '../settings.ts';
import { type ILISHNetwork, type LISHNetworkConfig, type LISHNetworkDefinition, CodedError, ErrorCodes } from '@shared';
import { type ILISHNetwork, type LISHNetworkConfig, type LISHNetworkDefinition, type BootstrapStatus, CodedError, ErrorCodes } from '@shared';
import { lishnetExists, getLISHnet, listLISHnets, listEnabledLISHnets, addLISHnet, updateLISHnet, deleteLISHnet, setLISHnetEnabled, addLISHnetIfNotExists, importLISHnets, upsertLISHnet, replaceLISHnets } from '../db/lishnets.ts';

/**
Expand All @@ -19,6 +19,8 @@ export class Networks {

// Callback for peer count changes
private _onPeerCountChange: ((counts: { networkID: string; count: number }[]) => void) | null = null;
// Callback for bootstrap status changes
private _onBootstrapStatusChange: ((networkID: string, status: BootstrapStatus) => void) | null = null;

constructor(db: Database, dataDir: string, dataServer: DataServer, settings: Settings) {
this.db = db;
Expand All @@ -27,6 +29,10 @@ export class Networks {
this.network.onPeerCountChange = counts => {
if (this._onPeerCountChange) this._onPeerCountChange(counts);
};
// Forward bootstrap status changes from the network node
this.network.onBootstrapStatusChange = (networkID, status) => {
if (this._onBootstrapStatusChange) this._onBootstrapStatusChange(networkID, status);
};
}

/**
Expand All @@ -36,6 +42,14 @@ export class Networks {
this._onPeerCountChange = cb;
}

/**
* Set a callback to be called whenever the per-peer bootstrap status for
* any joined lishnet changes (dial pending → connected/error/mismatch/timeout).
*/
set onBootstrapStatusChange(cb: ((networkID: string, status: BootstrapStatus) => void) | null) {
this._onBootstrapStatusChange = cb;
}

init(): void {
console.log('✓ Networks initialized');
}
Expand All @@ -54,16 +68,23 @@ export class Networks {
async startEnabledNetworks(): Promise<void> {
const enabled = this.getEnabled();

// Collect bootstrap peers from all enabled lishnets (may be empty)
const bootstrapPeers = this.collectBootstrapPeers(enabled);

// Always start the node
await this.network.start(bootstrapPeers);
// Start the node with no preset bootstrap list — bootstrap dials happen
// per-network below via addBootstrapPeers so per-network status tracking
// can record which specific peers connected / mismatched / timed out.
// (Previous behaviour used a flat preset list that bypassed our tracking.)
await this.network.start([]);

// Subscribe to topics for all enabled lishnets
// Subscribe to topics for all enabled lishnets and dial their bootstrap peers
// with networkID context so bootstrap status counters get populated.
for (const net of enabled) {
this.network.subscribeTopic(net.networkID);
this.joinedNetworks.add(net.networkID);
if (net.bootstrapPeers.length > 0) {
// Fire-and-forget so a slow / unreachable network does not delay startup of the others.
this.network.addBootstrapPeers(net.bootstrapPeers, net.networkID, 'configured').catch(err => {
console.error(`[Networks] addBootstrapPeers for ${net.networkID} failed:`, err?.message ?? err);
});
}
console.log(`✓ Joined lishnet: ${net.name} (${net.networkID})`);
}
}
Expand Down Expand Up @@ -101,7 +122,7 @@ export class Networks {
this.joinedNetworks.add(id);

const net = this.get(id);
if (net && net.bootstrapPeers.length > 0) await this.network.addBootstrapPeers(net.bootstrapPeers);
if (net && net.bootstrapPeers.length > 0) await this.network.addBootstrapPeers(net.bootstrapPeers, id, 'configured');

console.log(`✓ Joined lishnet: ${net?.name ?? id}`);
}
Expand Down Expand Up @@ -178,15 +199,6 @@ export class Networks {
return this.network.getMeshHealth(id);
}

/**
* Collect and deduplicate bootstrap peers from a set of network configs.
*/
private collectBootstrapPeers(configs: LISHNetworkConfig[]): string[] {
const allPeers: string[] = [];
for (const config of configs) allPeers.push(...config.bootstrapPeers);
return [...new Set(allPeers)];
}

// Validate a raw network object into a LISHNetworkDefinition (without storing).
validateNetwork(data: ILISHNetwork): LISHNetworkDefinition {
if (!data.networkID || !data.name) throw new CodedError(ErrorCodes.NETWORK_INVALID);
Expand Down Expand Up @@ -271,4 +283,39 @@ export class Networks {
replace(networks: LISHNetworkConfig[]): void {
replaceLISHnets(this.db, networks);
}

/**
* Return per-peer bootstrap status for one network (or null if no dial
* attempts have been recorded since the node started or the entries were
* last updated).
*/
getBootstrapStatus(id: string): BootstrapStatus | null {
return this.network.getBootstrapStatus(id);
}

/** Return per-peer bootstrap status for every network that has any tracked dials. */
getAllBootstrapStatuses(): BootstrapStatus[] {
return this.network.getAllBootstrapStatuses();
}

/**
* Replace the bootstrap peer list for an existing network. Resets the
* per-peer status entries that are no longer present in the new list, then
* (if the network is joined) re-dials the new entries so fresh status is
* recorded. Returns the updated config or null if the network is unknown.
*/
async updateBootstrapPeers(id: string, bootstrapPeers: string[]): Promise<LISHNetworkConfig | null> {
const existing = this.get(id);
if (!existing) return null;
const cleaned = bootstrapPeers.filter(p => typeof p === 'string' && p.trim().length > 0);
const next: LISHNetworkConfig = { ...existing, bootstrapPeers: cleaned };
updateLISHnet(this.db, next);
this.network.pruneBootstrapStatus(id, cleaned);
if (this.joinedNetworks.has(id) && cleaned.length > 0) {
this.network.addBootstrapPeers(cleaned, id, 'configured').catch(err => {
console.error(`[Networks] re-dial after updateBootstrapPeers failed:`, err?.message ?? err);
});
}
return next;
}
}
Loading