From 843bd8cbe23d300ccf69e0a985e48d66d93f9762 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iris=20Marie=20K=C3=B6ster?= Date: Wed, 13 Apr 2022 19:54:11 +0200 Subject: [PATCH 1/3] Add WebRTC functionality --- src/common/api/webrtc.js | 269 +++++++++++++++++++++++++++++++++ src/ic/ic-data-data-manager.js | 12 +- src/ic/ic-store.js | 6 + 3 files changed, 286 insertions(+), 1 deletion(-) create mode 100644 src/common/api/webrtc.js diff --git a/src/common/api/webrtc.js b/src/common/api/webrtc.js new file mode 100644 index 00000000..e088ef12 --- /dev/null +++ b/src/common/api/webrtc.js @@ -0,0 +1,269 @@ +/** + * This file is part of VILLASweb. + * + * VILLASweb is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * VILLASweb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with VILLASweb. If not, see . + ******************************************************************************/ +import AppDispatcher from '../app-dispatcher'; + +class WebRTC { + constructor(sessionurl, identifier) { + this.identifier = identifier + this.first = false; + this.polite = false; + this.ignoreOffer = false; + this.makingOffer = false; + + this.peerConnection = null; + this.dataChannel = null; + this.signalingClient = null; + + this.iceUsername = 'villas'; + this.icePassword = 'villas'; + this.iceUrls = [ + 'stun:stun.0l.de:3478', + 'turn:turn.0l.de:3478?transport=udp', + 'turn:turn.0l.de:3478?transport=tcp' + ]; + + this.connectPeers(sessionurl) + } + + // Connect the two peers. Normally you look for and connect to a remote + // machine here, but we're just connecting two local objects, so we can + // bypass that step. + connectPeers(sessionurl) { + // Create the local connection and its event listeners + this.peerConnection = new RTCPeerConnection({ + iceServers: [{ + username: this.iceUsername, + credential: this.icePassword, + urls: this.iceUrls + }] + }); + + this.peerConnection.onicecandidate = this.handleIceCandidate.bind(this); + this.peerConnection.onnegotiationneeded = this.handleNegotationNeeded.bind(this); + this.peerConnection.ondatachannel = this.handleNewDataChannel.bind(this) + + this.peerConnection.onconnectionstatechange = () => console.info('Connection state changed:', this.peerConnection.connectionState); + this.peerConnection.onsignalingstatechange = () => console.info('Signaling state changed:', this.peerConnection.signalingState); + this.peerConnection.oniceconnectionstatechange = () => console.info('ICE connection state changed:', this.peerConnection.iceConnectionState); + this.peerConnection.onicegatheringstatechange = () => console.info('ICE gathering state changed:', this.peerConnection.iceGatheringState); + + this.hallo() + + this.signalingClient = new WebSocket(sessionurl); + this.signalingClient.onmessage = this.handleSignalingMessage.bind(this); + + // Some more logging + this.signalingClient.onopen = (e) => console.info('Connected to signaling channel', e); + this.signalingClient.onerror = (e) => console.error('Failed to establish signaling connection', e); + } + + hallo() { + console.info("peer connection (hallo):") + console.info(this.peerConnection) + } + + handleIceCandidate(event) { + if (event.candidate == null) { + console.info('Candidate gathering completed'); + return; + } + + console.info('New local ICE Candidate', event.candidate); + + let msg = { + candidate: event.candidate.toJSON() + }; + console.info('Sending signaling message', msg); + this.signalingClient.send(JSON.stringify(msg)); + } + + async handleNegotationNeeded() { + console.info('Negotation needed!'); + + try { + this.makingOffer = true; + await this.peerConnection.setLocalDescription(); + let msg = { + description: this.peerConnection.localDescription.toJSON() + }; + console.info('Sending signaling message', msg); + this.signalingClient.send(JSON.stringify(msg)); + } catch (err) { + console.error(err); + } finally { + this.makingOffer = false; + } + } + + handleNewDataChannel(e) { + console.info('New datachannel', e.channel) + + //this.handleDataChannel(e.channel); + this.handleDataChannel(e.channel).bind(this); + } + + handleDataChannel(ch) { + this.dataChannel = ch; + + this.dataChannel.onopen = () => console.info('Datachannel opened'); + this.dataChannel.onclose = () => console.info('Datachannel closed'); + this.dataChannel.onmessage = this.handleDataChannelMessage.bind(this); + } + + async handleSignalingMessage(event) { + let msg = JSON.parse(event.data); + + console.info('Received signaling message', msg); + + try { + if (msg.control !== undefined) { + this.first = true; + for (var connection of msg.control.connections) { + if (connection.id < msg.control.connection_id) + this.first = false; + } + + this.polite = this.first; + + console.info('Role', { + polite: this.polite, + first: this.first + }) + + if (!this.first) { + // Create the data channel and establish its event listeners + let ch = this.peerConnection.createDataChannel('villas'); + + this.handleDataChannel(ch); + } + } else if (msg.description !== undefined) { + const offerCollision = (msg.description.type == 'offer') && + (this.makingOffer || this.peerConnection.signalingState != 'stable'); + + this.ignoreOffer = !this.polite && offerCollision; + if (this.ignoreOffer) { + return; + } + + await this.peerConnection.setRemoteDescription(msg.description); + console.info(msg.description); + if (msg.description.type == 'offer') { + await this.peerConnection.setLocalDescription(); + let msg = { + description: this.peerConnection.localDescription.toJSON() + } + this.signalingClient.send(JSON.stringify(msg)) + } + } else if (msg.candidate !== undefined) { + try { + console.info('New remote ICE candidate', msg.candidate); + await this.peerConnection.addIceCandidate(msg.candidate); + } catch (err) { + if (!this.ignoreOffer) { + throw err; + } + } + } + } catch (err) { + console.error(err); + } + } + + handleDataChannel(channel) { + this.dataChannel = channel; + this.dataChannel.onopen = () => console.info('Datachannel opened'); + this.dataChannel.onclose = () => console.info('Datachannel closed'); + this.dataChannel.onmessage = this.handleDataChannelMessage; + } + + async jsonToMessageArray(msgJson) { + // parse incoming message into usable data + if (msgJson.length === 0) { + return null; + } + + const ts = msgJson.ts.origin[0] * 1e3 + msgJson.ts.origin[1] * 1e-6 + + return { + version: 2, + type: 0, + source_index: 0, + length: length, + sequence: msgJson.sequence, + timestamp: ts, + values: msgJson.data, + blob: new DataView(msgJson.data), + // id: id + }; + } + + // Handle onmessage events for the receiving channel. + // These are the data messages sent by the sending channel. + async handleDataChannelMessage(event) { + var dec = new TextDecoder(); + + var raw = event.data; + var msg = dec.decode(await raw.arrayBuffer()); + var msgJson = JSON.parse(msg); + + console.info('Received message', msgJson); + let msgarr = null + + + const ts = msgJson[0].ts.origin[0] * 1e3 + msgJson[0].ts.origin[1] * 1e-6 + let buffer = new Float64Array(msgJson[0].data).buffer + + msgarr = { + version: 2, + type: 0, + source_index: 0, + length: 5, + sequence: msgJson[0].sequence, + timestamp: ts, + values: msgJson[0].data, + blob: new DataView(buffer) + }; + + if (msgarr) { + AppDispatcher.dispatch({ + type: 'icData/data-changed', + data: [msgarr], + id: 547648 + }); + } + } + + disconnectPeers() { + console.log("disconnecting peers") + + if (this.signalingClient) + this.signalingClient.close() + + if (this.dataChannel) + this.dataChannel.close(); + + if (this.peerConnection) + this.peerConnection.close(); + + this.dataChannel = null; + this.peerConnection = null; + this.signalingClient = null; + } + +} + +export default WebRTC; diff --git a/src/ic/ic-data-data-manager.js b/src/ic/ic-data-data-manager.js index 75d57d91..31f152d3 100644 --- a/src/ic/ic-data-data-manager.js +++ b/src/ic/ic-data-data-manager.js @@ -16,6 +16,7 @@ ******************************************************************************/ import WebsocketAPI from '../common/api/websocket-api'; +import WebRTC from '../common/api/webrtc'; import AppDispatcher from '../common/app-dispatcher'; import RestAPI from "../common/api/rest-api"; @@ -25,6 +26,7 @@ const OFFSET_VERSION = 4; class IcDataDataManager { constructor() { this._sockets = {}; + this._webrtc = null; } open(websocketurl, identifier) { @@ -35,6 +37,10 @@ class IcDataDataManager { this._sockets[identifier] = new WebsocketAPI(websocketurl, { onOpen: (event) => this.onOpen(event, identifier, true), onClose: (event) => this.onClose(event, identifier), onMessage: (event) => this.onMessage(event, identifier) }); } + openWebRTC(sessionurl, identifier) { + this._webrtc = new WebRTC(sessionurl, identifier) + } + update(websocketurl, identifier) { if (this._sockets[identifier] != null) { if (this._sockets[identifier].websocketurl !== websocketurl) { @@ -52,6 +58,10 @@ class IcDataDataManager { delete this._sockets[identifier]; } } + + if (this._webrtc) { + this._webrtc.disconnectPeers(); + } } send(message, identifier) { @@ -166,7 +176,7 @@ class IcDataDataManager { return buffer; } - updateSignalValueInWidgets(signalID, newValues){ + updateSignalValueInWidgets(signalID, newValues) { AppDispatcher.dispatch({ type: 'widgets/signal-value-changed', signalID: signalID, diff --git a/src/ic/ic-store.js b/src/ic/ic-store.js index 3f9b83f1..528c5ae7 100644 --- a/src/ic/ic-store.js +++ b/src/ic/ic-store.js @@ -48,6 +48,12 @@ class InfrastructureComponentStore extends ArrayStore { for (let ic of action.data) { if (ic.websocketurl != null && ic.websocketurl !== '') { ICDataDataManager.open(ic.websocketurl, ic.id); + } else if (ic.statusupdateraw != null && ic.statusupdateraw.properties != null) { + let rawProps = ic.statusupdateraw.properties + if (rawProps != null && typeof rawProps.server !== 'undefined') { + let url = rawProps.server + '/' + rawProps.session + ICDataDataManager.openWebRTC(url, ic.id); + } } else { NotificationsDataManager.addNotification(NotificationsFactory.WEBSOCKET_URL_WARN(ic.name, ic.uuid)); } From 3aee36f06e66847e291bfda2c51c041b6ac3e3d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iris=20Marie=20K=C3=B6ster?= Date: Wed, 20 Apr 2022 11:58:15 +0200 Subject: [PATCH 2/3] read messages in villas.web format --- src/common/api/webrtc.js | 84 +++++++++++++++------------------------- 1 file changed, 31 insertions(+), 53 deletions(-) diff --git a/src/common/api/webrtc.js b/src/common/api/webrtc.js index e088ef12..341ab28c 100644 --- a/src/common/api/webrtc.js +++ b/src/common/api/webrtc.js @@ -16,6 +16,9 @@ ******************************************************************************/ import AppDispatcher from '../app-dispatcher'; +const OFFSET_TYPE = 2; +const OFFSET_VERSION = 4; + class WebRTC { constructor(sessionurl, identifier) { this.identifier = identifier @@ -112,8 +115,7 @@ class WebRTC { handleNewDataChannel(e) { console.info('New datachannel', e.channel) - //this.handleDataChannel(e.channel); - this.handleDataChannel(e.channel).bind(this); + this.handleDataChannel(e.channel); } handleDataChannel(ch) { @@ -182,67 +184,37 @@ class WebRTC { console.error(err); } } - - handleDataChannel(channel) { - this.dataChannel = channel; - this.dataChannel.onopen = () => console.info('Datachannel opened'); - this.dataChannel.onclose = () => console.info('Datachannel closed'); - this.dataChannel.onmessage = this.handleDataChannelMessage; - } - - async jsonToMessageArray(msgJson) { - // parse incoming message into usable data - if (msgJson.length === 0) { - return null; - } - - const ts = msgJson.ts.origin[0] * 1e3 + msgJson.ts.origin[1] * 1e-6 - - return { - version: 2, - type: 0, - source_index: 0, - length: length, - sequence: msgJson.sequence, - timestamp: ts, - values: msgJson.data, - blob: new DataView(msgJson.data), - // id: id - }; - } // Handle onmessage events for the receiving channel. // These are the data messages sent by the sending channel. async handleDataChannelMessage(event) { - var dec = new TextDecoder(); + let data = new DataView(await event.data.arrayBuffer()) - var raw = event.data; - var msg = dec.decode(await raw.arrayBuffer()); - var msgJson = JSON.parse(msg); + if (data.byteLength === 0) { + return null; + } - console.info('Received message', msgJson); - let msgarr = null + const source_index = data.getUint8(1); + const bits = data.getUint8(0); + const length = data.getUint16(0x02, 1); + const bytes = length * 4 + 16; - - const ts = msgJson[0].ts.origin[0] * 1e3 + msgJson[0].ts.origin[1] * 1e-6 - let buffer = new Float64Array(msgJson[0].data).buffer - - msgarr = { - version: 2, - type: 0, - source_index: 0, - length: 5, - sequence: msgJson[0].sequence, - timestamp: ts, - values: msgJson[0].data, - blob: new DataView(buffer) + let msgarr = { + version: (bits >> OFFSET_VERSION) & 0xF, + type: (bits >> OFFSET_TYPE) & 0x3, + source_index: source_index, + length: length, + sequence: data.getUint32(0x04, 1), + timestamp: data.getUint32(0x08, 1) * 1e3 + data.getUint32(0x0C, 1) * 1e-6, + values: new Float32Array(data.buffer, data.byteOffset + 0x10, length), + blob: new DataView(data.buffer, data.byteOffset + 0x00, bytes), }; if (msgarr) { AppDispatcher.dispatch({ type: 'icData/data-changed', data: [msgarr], - id: 547648 + id: this.identifier }); } } @@ -250,14 +222,20 @@ class WebRTC { disconnectPeers() { console.log("disconnecting peers") - if (this.signalingClient) + if (this.signalingClient) { + console.info("close signaling client") this.signalingClient.close() + } - if (this.dataChannel) + if (this.dataChannel) { + console.info("close data channel") this.dataChannel.close(); + } - if (this.peerConnection) + if (this.peerConnection) { + console.info("close peer connection") this.peerConnection.close(); + } this.dataChannel = null; this.peerConnection = null; From fc831a8149ed809ae138327552674d1142f8c7f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iris=20Marie=20K=C3=B6ster?= Date: Fri, 22 Apr 2022 17:03:26 +0200 Subject: [PATCH 3/3] harmonize with websocket handling --- src/common/api/webrtc.js | 48 ++++++++-------------------------- src/ic/ic-data-data-manager.js | 22 +++++++++++----- 2 files changed, 26 insertions(+), 44 deletions(-) diff --git a/src/common/api/webrtc.js b/src/common/api/webrtc.js index 341ab28c..1f086295 100644 --- a/src/common/api/webrtc.js +++ b/src/common/api/webrtc.js @@ -20,7 +20,7 @@ const OFFSET_TYPE = 2; const OFFSET_VERSION = 4; class WebRTC { - constructor(sessionurl, identifier) { + constructor(sessionurl, identifier, callbacks) { this.identifier = identifier this.first = false; this.polite = false; @@ -39,13 +39,14 @@ class WebRTC { 'turn:turn.0l.de:3478?transport=tcp' ]; - this.connectPeers(sessionurl) + console.log(callbacks) + this.onOpen = callbacks.onOpen.bind(this); + this.onMessage = callbacks.onMessage.bind(this); + this.onClose = callbacks.onClose.bind(this); + this.connectPeers(sessionurl, callbacks); } - // Connect the two peers. Normally you look for and connect to a remote - // machine here, but we're just connecting two local objects, so we can - // bypass that step. - connectPeers(sessionurl) { + connectPeers(sessionurl, callbacks) { // Create the local connection and its event listeners this.peerConnection = new RTCPeerConnection({ iceServers: [{ @@ -184,39 +185,12 @@ class WebRTC { console.error(err); } } - + // Handle onmessage events for the receiving channel. // These are the data messages sent by the sending channel. - async handleDataChannelMessage(event) { - let data = new DataView(await event.data.arrayBuffer()) - - if (data.byteLength === 0) { - return null; - } - - const source_index = data.getUint8(1); - const bits = data.getUint8(0); - const length = data.getUint16(0x02, 1); - const bytes = length * 4 + 16; - - let msgarr = { - version: (bits >> OFFSET_VERSION) & 0xF, - type: (bits >> OFFSET_TYPE) & 0x3, - source_index: source_index, - length: length, - sequence: data.getUint32(0x04, 1), - timestamp: data.getUint32(0x08, 1) * 1e3 + data.getUint32(0x0C, 1) * 1e-6, - values: new Float32Array(data.buffer, data.byteOffset + 0x10, length), - blob: new DataView(data.buffer, data.byteOffset + 0x00, bytes), - }; - - if (msgarr) { - AppDispatcher.dispatch({ - type: 'icData/data-changed', - data: [msgarr], - id: this.identifier - }); - } + async handleDataChannelMessage(event) { + let data = await event.data.arrayBuffer() + this.onMessage(data, this.identifier) } disconnectPeers() { diff --git a/src/ic/ic-data-data-manager.js b/src/ic/ic-data-data-manager.js index 31f152d3..785d6a68 100644 --- a/src/ic/ic-data-data-manager.js +++ b/src/ic/ic-data-data-manager.js @@ -26,7 +26,7 @@ const OFFSET_VERSION = 4; class IcDataDataManager { constructor() { this._sockets = {}; - this._webrtc = null; + this._webrtc_connections = {}; } open(websocketurl, identifier) { @@ -34,11 +34,14 @@ class IcDataDataManager { if (this._sockets[identifier] != null) return; // already open? - this._sockets[identifier] = new WebsocketAPI(websocketurl, { onOpen: (event) => this.onOpen(event, identifier, true), onClose: (event) => this.onClose(event, identifier), onMessage: (event) => this.onMessage(event, identifier) }); + this._sockets[identifier] = new WebsocketAPI(websocketurl, { onOpen: (event) => this.onOpen(event, identifier, true), onClose: (event) => this.onClose(event, identifier), onMessage: (event) => this.onMessage(event.data, identifier) }); } openWebRTC(sessionurl, identifier) { - this._webrtc = new WebRTC(sessionurl, identifier) + if (this._webrtc_connections[identifier] != null) + return; // already connected + + this._webrtc_connections[identifier] = new WebRTC(sessionurl, identifier, { onOpen: (event) => this.onOpen(event, identifier, true), onClose: (event) => this.onClose(event, identifier), onMessage: (event) => this.onMessage(event, identifier) }); } update(websocketurl, identifier) { @@ -59,8 +62,12 @@ class IcDataDataManager { } } - if (this._webrtc) { - this._webrtc.disconnectPeers(); + // close all open WebRTC connections + for (var rtc_id in this._webrtc_connections) { + if (this._webrtc_connections.hasOwnProperty(rtc_id)) { + this._webrtc_connections[rtc_id].disconnectPeers(); + delete this._webrtc_connections[rtc_id]; + } } } @@ -95,8 +102,9 @@ class IcDataDataManager { delete this._sockets[identifier]; } - onMessage(event, identifier) { - var msgs = this.bufferToMessageArray(event.data); + onMessage(dataBuffer, identifier) { + console.log(dataBuffer) + var msgs = this.bufferToMessageArray(dataBuffer); if (msgs.length > 0) { AppDispatcher.dispatch({