Skip to content

Commit e51e6c3

Browse files
telnet up/down recovery
1 parent e47aaf0 commit e51e6c3

6 files changed

Lines changed: 191 additions & 90 deletions

File tree

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "ollieos",
3-
"version": "2.3.2",
3+
"version": "2.3.3",
44
"description": "",
55
"main": "server.js",
66
"scripts": {

src/kernel/network.ts

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
export type NetworkStateChangeListener = (is_up: boolean) => void | Promise<void>;
2+
13
export type SocketDataListener = (data: Uint8Array) => void | Promise<void>;
24
export type SocketCloseListener = () => void | Promise<void>;
35

46
export type UserspaceSocketConnectionListener = (client: UserspaceClientSocket) => void | Promise<void>;
57
export type SocketConnectionListener = (client: AbstractClientSocket) => void | Promise<void>;
68

9+
export type NetworkManagerEvent = "state_change";
710
export type ClientSocketEvent = "data" | "close";
811
export type ServerSocketEvent = "connection" | "close";
912

@@ -15,6 +18,8 @@ export type UserspaceClientSocketEventListener = SocketDataListener | SocketClos
1518
export type UserspaceServerSocketEventListener = UserspaceSocketConnectionListener | SocketCloseListener;
1619
export type UserspaceSocketEventListener = UserspaceClientSocketEventListener | UserspaceServerSocketEventListener;
1720

21+
export type NetworkManagerEventListener = NetworkStateChangeListener;
22+
1823
// TODO: doc this
1924
// in short impls need to care about data flow and ensuring that errors in listeners dont crash anything
2025

@@ -282,44 +287,64 @@ export interface UserspaceNetworkManager {
282287
is_up(try_waiting?: boolean): Promise<boolean>;
283288

284289
// listen and connect must be done via the pcb in userspace to track ownership
290+
// same for adding event listeners to the manager, as we want the listeners to die at the end of the process
291+
// doesn't apply to listeners on the socket because the whole socket is killed at the end of the process, so no risk of leaking listeners there
285292
}
286293

287294
export abstract class AbstractNetworkManager {
288-
readonly #port_map: Map<number, AbstractServerSocket> = new Map();
295+
protected _port_map: Map<number, AbstractServerSocket> = new Map();
289296

290297
get bound_ports(): number[] {
291-
return Array.from(this.#port_map.keys());
298+
return Array.from(this._port_map.keys());
292299
}
293300

294301
abstract get_unique_manager_type_name(): string;
295302

296303
abstract is_up(try_waiting?: boolean): Promise<boolean>;
297304

305+
protected _state_change_listeners: Set<NetworkStateChangeListener> = new Set();
306+
307+
add_event_listener(event: NetworkManagerEvent, callback: NetworkManagerEventListener): void {
308+
switch (event) {
309+
case "state_change":
310+
this._state_change_listeners.add(callback);
311+
break;
312+
}
313+
}
314+
315+
remove_event_listener(event: NetworkManagerEvent, callback: NetworkManagerEventListener): void {
316+
switch (event) {
317+
case "state_change":
318+
this._state_change_listeners.delete(callback);
319+
break;
320+
}
321+
}
322+
298323
protected abstract _handle_listen_internal(port: number): Promise<AbstractServerSocket>;
299324

300325
// listen covers both binding and listening
301326
async listen(port: number): Promise<AbstractServerSocket> {
302-
if (this.#port_map.has(port)) {
327+
if (this._port_map.has(port)) {
303328
throw new PortInUseError(port);
304329
}
305330

306331
// forcibly claim the port to avoid race condition
307-
this.#port_map.set(port, null as unknown as AbstractServerSocket);
332+
this._port_map.set(port, null as unknown as AbstractServerSocket);
308333

309334
try {
310335
// invoke implementation class to create the server socket, then store it in the port map
311336
const server_socket = await this._handle_listen_internal(port);
312-
this.#port_map.set(port, server_socket);
337+
this._port_map.set(port, server_socket);
313338

314339
// remove once the server socket is closed
315340
server_socket.add_event_listener("close", () => {
316-
this.#port_map.delete(port);
341+
this._port_map.delete(port);
317342
});
318343

319344
return server_socket;
320345
} catch (err) {
321346
// if there was an error, remove the port from the map so it can be retried
322-
this.#port_map.delete(port);
347+
this._port_map.delete(port);
323348
throw err;
324349
}
325350
}

src/kernel/processes.ts

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import type {ParsedCommandLine} from "./index";
44
import {
55
AbstractClientSocket,
66
AbstractNetworkManager,
7-
AbstractServerSocket,
7+
AbstractServerSocket, NetworkManagerEvent, NetworkManagerEventListener,
88
UserspaceClientSocket,
99
UserspaceServerSocket
1010
} from "./network";
@@ -321,6 +321,8 @@ export interface UserspaceProcessContext extends UserspaceOtherProcessContext {
321321
create_window(): AbstractWindow | null;
322322
network_listen(port: number): Promise<UserspaceServerSocket>;
323323
network_connect(host: string, port: number): Promise<UserspaceClientSocket>;
324+
network_add_manager_listener(event: NetworkManagerEvent, listener: NetworkManagerEventListener): void;
325+
network_remove_manager_listener(event: NetworkManagerEvent, listener: NetworkManagerEventListener): void;
324326
get bound_ports(): number[];
325327
}
326328

@@ -348,6 +350,7 @@ export class ProcessContext {
348350

349351
readonly #port_map: Map<number, AbstractServerSocket> = new Map();
350352
readonly #network_clients: Set<AbstractClientSocket> = new Set();
353+
readonly #network_manager_listeners: Map<NetworkManagerEvent, Set<NetworkManagerEventListener>> = new Map();
351354

352355
constructor(pid: number, terminal: AbstractTerminal, source_command: ParsedCommandLine, registry: ProcessManager, shell?: AbstractShell) {
353356
this.#pid = pid;
@@ -435,8 +438,15 @@ export class ProcessContext {
435438
socket.close();
436439
});
437440

441+
this.#network_manager_listeners.forEach((listeners, event) => {
442+
listeners.forEach((listener) => {
443+
net_manager.remove_event_listener(event, listener);
444+
});
445+
});
446+
438447
this.#port_map.clear();
439448
this.#network_clients.clear();
449+
this.#network_manager_listeners.clear();
440450
}
441451
}
442452

@@ -596,6 +606,34 @@ export class ProcessContext {
596606
return socket;
597607
}
598608

609+
network_add_manager_listener(event: NetworkManagerEvent, listener: NetworkManagerEventListener) {
610+
const net_manager = this.#manager.network_manager;
611+
if (!net_manager) {
612+
throw new Error("No network manager available");
613+
}
614+
615+
if (!this.#network_manager_listeners.has(event)) {
616+
this.#network_manager_listeners.set(event, new Set());
617+
}
618+
619+
this.#network_manager_listeners.get(event)!.add(listener);
620+
621+
net_manager.add_event_listener(event, listener);
622+
}
623+
624+
network_remove_manager_listener(event: NetworkManagerEvent, listener: NetworkManagerEventListener) {
625+
const net_manager = this.#manager.network_manager;
626+
if (!net_manager) {
627+
throw new Error("No network manager available");
628+
}
629+
630+
if (this.#network_manager_listeners.has(event)) {
631+
this.#network_manager_listeners.get(event)!.delete(listener);
632+
}
633+
634+
net_manager.remove_event_listener(event, listener);
635+
}
636+
599637
get bound_ports(): number[] {
600638
return Array.from(this.#port_map.keys());
601639
}
@@ -650,6 +688,12 @@ export class ProcessContext {
650688
const socket = await self.network_connect(host, port);
651689
return socket.create_userspace_proxy();
652690
}, enumerable: true },
691+
network_add_manager_listener: { value: (event: NetworkManagerEvent, listener: NetworkManagerEventListener) => {
692+
self.network_add_manager_listener(event, listener);
693+
}, enumerable: true },
694+
network_remove_manager_listener: { value: (event: NetworkManagerEvent, listener: NetworkManagerEventListener) => {
695+
self.network_remove_manager_listener(event, listener);
696+
}, enumerable: true },
653697
bound_ports: { get: () => self.bound_ports, enumerable: true },
654698
});
655699

src/network_impl/porter.ts

Lines changed: 62 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,14 @@ interface AckConnectMessage extends AckMessage {
7171
sock_id: string;
7272
}
7373

74-
type InboundMessage = IncomingConnectionMessage | ConnectionClosingMessage | AckBindMessage | AckUnbindMessage | DataMessage | AckCloseConnectionMessage | AckConnectMessage;
74+
type InboundMessage =
75+
IncomingConnectionMessage
76+
| ConnectionClosingMessage
77+
| AckBindMessage
78+
| AckUnbindMessage
79+
| DataMessage
80+
| AckCloseConnectionMessage
81+
| AckConnectMessage;
7582

7683
const uint8_to_base64 = (data: Uint8Array): string => {
7784
let binary = "";
@@ -195,9 +202,6 @@ export class PorterBridgeServerSocket extends AbstractServerSocket {
195202
}
196203

197204
export class PorterBridgeNetworkManager extends AbstractNetworkManager {
198-
// port -> server
199-
readonly #port_map = new Map<number, PorterBridgeServerSocket>();
200-
201205
// sock_id -> client
202206
readonly #client_map = new Map<string, PorterBridgeClientSocket>();
203207

@@ -281,7 +285,7 @@ export class PorterBridgeNetworkManager extends AbstractNetworkManager {
281285

282286
switch (data.type) {
283287
case "incoming_connection": {
284-
const socket = this.#port_map.get(data.port);
288+
const socket = this._port_map.get(data.port) as PorterBridgeServerSocket | undefined;
285289
if (socket) {
286290
// create the client
287291
const client = new PorterBridgeClientSocket(data.sock_id, this);
@@ -327,19 +331,48 @@ export class PorterBridgeNetworkManager extends AbstractNetworkManager {
327331
console.log("WebSocket connection established");
328332
this.#reconnect_attempts = 0;
329333

330-
// try to rebind all ports on reconnect
331-
for (const port of this.#port_map.keys()) {
332-
const msg: BindMessage = {
333-
type: "bind_req",
334-
port,
335-
};
336-
this.#ws.send(JSON.stringify(msg));
337-
}
334+
// disabled as may be unexpected given we inform programs of network state change
335+
// // try to rebind all ports on reconnect
336+
// for (const port of this.#port_map.keys()) {
337+
// const msg: BindMessage = {
338+
// type: "bind_req",
339+
// port,
340+
// };
341+
// this.#ws.send(JSON.stringify(msg));
342+
// }
343+
344+
// inform event listeners of state change
345+
this._state_change_listeners.forEach(listener => {
346+
try {
347+
listener(true);
348+
} catch (err) {
349+
console.error("Error in state change listener:", err);
350+
}
351+
});
338352
});
339353

340-
this.#ws.addEventListener("close", () => {
354+
this.#ws.addEventListener("close", async () => {
341355
console.warn("WebSocket connection closed, attempting to reconnect...");
342356

357+
// inform event listeners of state change
358+
this._state_change_listeners.forEach(listener => {
359+
try {
360+
listener(false);
361+
} catch (err) {
362+
console.error("Error in state change listener:", err);
363+
}
364+
});
365+
366+
// passively close all clients and clear maps
367+
const client_close_promises = Array.from(this.#client_map.values()).map(client => client.close(true));
368+
await Promise.allSettled(client_close_promises);
369+
this.#client_map.clear();
370+
371+
// close all servers and clear map
372+
const server_close_promises = Array.from(this._port_map.values()).map(server => server.close());
373+
await Promise.allSettled(server_close_promises);
374+
this._port_map.clear();
375+
343376
// attempt to reconnect with exponential backoff
344377
setTimeout(() => {
345378
this.#connect_to_ws();
@@ -358,12 +391,6 @@ export class PorterBridgeNetworkManager extends AbstractNetworkManager {
358391

359392
protected async _handle_listen_internal(port: number): Promise<AbstractServerSocket> {
360393
const socket = new PorterBridgeServerSocket(port, this);
361-
this.#port_map.set(port, socket);
362-
363-
// add close listener to remove from port map on close
364-
socket.add_event_listener("close", () => {
365-
this.#port_map.delete(port);
366-
});
367394

368395
const msg: BindMessage = {
369396
type: "bind_req",
@@ -375,27 +402,21 @@ export class PorterBridgeNetworkManager extends AbstractNetworkManager {
375402

376403
// wait for ack
377404
// TODO: move this to the routing instead for more efficiency
378-
try {
379-
await new Promise<void>((resolve, reject) => {
380-
const handler = (event: MessageEvent) => {
381-
const data = JSON.parse(event.data) as InboundMessage;
382-
if (data.type === "ack_bind" && data.port === port) {
383-
this.#ws.removeEventListener("message", handler);
384-
if (data.success) {
385-
resolve();
386-
} else {
387-
reject(new Error(data.error || "Unknown error binding port"));
388-
}
405+
await new Promise<void>((resolve, reject) => {
406+
const handler = (event: MessageEvent) => {
407+
const data = JSON.parse(event.data) as InboundMessage;
408+
if (data.type === "ack_bind" && data.port === port) {
409+
this.#ws.removeEventListener("message", handler);
410+
if (data.success) {
411+
resolve();
412+
} else {
413+
reject(new Error(data.error || "Unknown error binding port"));
389414
}
390-
};
415+
}
416+
};
391417

392-
this.#ws.addEventListener("message", handler);
393-
});
394-
} catch (err) {
395-
// cleanup on failure
396-
this.#port_map.delete(port);
397-
throw err;
398-
}
418+
this.#ws.addEventListener("message", handler);
419+
});
399420

400421
return socket;
401422
}
@@ -443,3 +464,5 @@ export class PorterBridgeNetworkManager extends AbstractNetworkManager {
443464
});
444465
}
445466
}
467+
468+
// TODO: make the close/open recovery less up to the impl and more enforced by the abstract

0 commit comments

Comments
 (0)