@@ -25,7 +25,14 @@ interface CloseConnectionMessage extends BaseMessage {
2525 sock_id : string ;
2626}
2727
28- type OutboundMessage = BindMessage | UnbindMessage | DataMessage | CloseConnectionMessage ;
28+ interface ConnectMessage extends BaseMessage {
29+ type : "connect_req" ;
30+ host : string ;
31+ port : number ;
32+ force_sock_id ?: string ;
33+ }
34+
35+ type OutboundMessage = BindMessage | UnbindMessage | DataMessage | CloseConnectionMessage | ConnectMessage ;
2936
3037interface IncomingConnectionMessage extends BaseMessage {
3138 type : "incoming_connection" ;
@@ -35,32 +42,36 @@ interface IncomingConnectionMessage extends BaseMessage {
3542
3643interface ConnectionClosingMessage extends BaseMessage {
3744 type : "connection_closing" ;
38- port : number ;
45+ port ? : number ;
3946 sock_id : string ;
4047}
4148
42- interface AckBindMessage extends BaseMessage {
43- type : "ack_bind" ;
44- port : number ;
49+ interface AckMessage extends BaseMessage {
4550 success : boolean ;
4651 error ?: string ;
4752}
4853
49- interface AckUnbindMessage extends BaseMessage {
54+ interface AckBindMessage extends AckMessage {
55+ type : "ack_bind" ;
56+ port : number ;
57+ }
58+
59+ interface AckUnbindMessage extends AckMessage {
5060 type : "ack_unbind" ;
5161 port : number ;
52- success : boolean ;
53- error ?: string ;
5462}
5563
56- interface AckCloseConnectionMessage extends BaseMessage {
64+ interface AckCloseConnectionMessage extends AckMessage {
5765 type : "ack_close_connection" ;
5866 sock_id : string ;
59- success : boolean ;
60- error ?: string ;
6167}
6268
63- type InboundMessage = IncomingConnectionMessage | ConnectionClosingMessage | AckBindMessage | AckUnbindMessage | DataMessage | AckCloseConnectionMessage ;
69+ interface AckConnectMessage extends AckMessage {
70+ type : "ack_connect" ;
71+ sock_id : string ;
72+ }
73+
74+ type InboundMessage = IncomingConnectionMessage | ConnectionClosingMessage | AckBindMessage | AckUnbindMessage | DataMessage | AckCloseConnectionMessage | AckConnectMessage ;
6475
6576const uint8_to_base64 = ( data : Uint8Array ) : string => {
6677 let binary = "" ;
@@ -79,7 +90,12 @@ export class PorterBridgeClientSocket extends AbstractClientSocket {
7990 this . ready_state = SocketReadyState . OPEN ;
8091 }
8192
82- protected async _handle_close_internal ( ) : Promise < void > {
93+ protected async _handle_close_internal ( passive : boolean ) : Promise < void > {
94+ if ( passive ) {
95+ // nothing to do
96+ return ;
97+ }
98+
8399 const msg : CloseConnectionMessage = {
84100 type : "close_connection" ,
85101 sock_id : this . id ,
@@ -187,7 +203,7 @@ export class PorterBridgeNetworkManager extends AbstractNetworkManager {
187203
188204 constructor ( ) {
189205 super ( ) ;
190- this . #connect ( ) ;
206+ this . #connect_to_ws ( ) ;
191207 }
192208
193209 #ws: WebSocket ;
@@ -256,7 +272,7 @@ export class PorterBridgeNetworkManager extends AbstractNetworkManager {
256272 } ) ;
257273 }
258274
259- #connect ( ) {
275+ #connect_to_ws ( ) {
260276 // open websocket connection on port 9000, handling all routing (rather than making a separate listener/connection for each socket)
261277 this . #ws = new WebSocket ( "ws://127.0.0.1:9000" ) ;
262278
@@ -285,7 +301,8 @@ export class PorterBridgeNetworkManager extends AbstractNetworkManager {
285301 case "connection_closing" : {
286302 const client = this . #client_map. get ( data . sock_id ) ;
287303 if ( client ) {
288- client . close ( ) . catch ( err => {
304+ // emit passive close event
305+ client . close ( true ) . catch ( err => {
289306 console . error ( `Error closing client socket ${ data . sock_id } :` , err ) ;
290307 } ) ;
291308 } else {
@@ -325,7 +342,7 @@ export class PorterBridgeNetworkManager extends AbstractNetworkManager {
325342
326343 // attempt to reconnect with exponential backoff
327344 setTimeout ( ( ) => {
328- this . #connect ( ) ;
345+ this . #connect_to_ws ( ) ;
329346 } , Math . min ( 50 * 2 ** this . #reconnect_attempts, 3000 ) ) ;
330347 this . #reconnect_attempts++ ;
331348 } ) ;
@@ -384,6 +401,45 @@ export class PorterBridgeNetworkManager extends AbstractNetworkManager {
384401 }
385402
386403 async connect ( host : string , port : number ) : Promise < AbstractClientSocket > {
387- throw new Error ( "Outbound connections are not supported in porter networking yet" ) ;
404+ // generate a sock id that we will provide to immediately reserve in the map without waiting for a reply
405+ const sock_id = crypto . randomUUID ( ) ;
406+
407+ const msg : ConnectMessage = {
408+ type : "connect_req" ,
409+ host,
410+ port,
411+ force_sock_id : sock_id ,
412+ } ;
413+
414+ await this . wait_for_ws_ready ( ) ;
415+ this . #ws. send ( JSON . stringify ( msg ) ) ;
416+
417+ return new Promise < PorterBridgeClientSocket > ( ( resolve , reject ) => {
418+ const handler = ( event : MessageEvent ) => {
419+ const data = JSON . parse ( event . data ) as InboundMessage ;
420+
421+ // TODO: move this routing logic to the manager instead of having each connect call add its own listener for efficiency
422+ if ( data . type === "ack_connect" && data . sock_id === sock_id ) {
423+ this . #ws. removeEventListener ( "message" , handler ) ;
424+
425+ if ( data . success ) {
426+ // create the client and add to map
427+ const client = new PorterBridgeClientSocket ( sock_id , this ) ;
428+ this . #client_map. set ( sock_id , client ) ;
429+
430+ // cleanup on close
431+ client . add_event_listener ( "close" , ( ) => {
432+ this . #client_map. delete ( sock_id ) ;
433+ } ) ;
434+
435+ resolve ( client ) ;
436+ } else {
437+ reject ( new Error ( data . error || `Failed to connect to ${ host } :${ port } ` ) ) ;
438+ }
439+ }
440+ } ;
441+
442+ this . #ws. addEventListener ( "message" , handler ) ;
443+ } ) ;
388444 }
389445}
0 commit comments