diff --git a/docker-compose-macos.yaml b/docker-compose-macos.yaml index 2d0840e20a..63b408a360 100644 --- a/docker-compose-macos.yaml +++ b/docker-compose-macos.yaml @@ -233,7 +233,7 @@ services: container_name: devicehub-api-groups-engine env_file: - scripts/variables.env - command: stf groups-engine --connect-push tcp://devicehub-triproxy-app:7170 --connect-push-dev tcp://devicehub-triproxy-dev:7270 + command: stf groups-engine --connect-push tcp://devicehub-triproxy-app:7170 --connect-push-dev tcp://devicehub-triproxy-dev:7270 --connect-sub tcp://devicehub-triproxy-app:7150 depends_on: devicehub-migrate: condition: service_completed_successfully diff --git a/docker-compose-prod.yaml b/docker-compose-prod.yaml index 123ce8ded9..b66272d56b 100644 --- a/docker-compose-prod.yaml +++ b/docker-compose-prod.yaml @@ -249,7 +249,7 @@ services: container_name: devicehub-api-groups-engine env_file: - scripts/variables.env - command: stf groups-engine --connect-push tcp://devicehub-triproxy-app:7170 --connect-push-dev tcp://devicehub-triproxy-dev:7270 + command: stf groups-engine --connect-push tcp://devicehub-triproxy-app:7170 --connect-push-dev tcp://devicehub-triproxy-dev:7270 --connect-sub tcp://devicehub-triproxy-app:7150 depends_on: devicehub-migrate: condition: service_completed_successfully diff --git a/lib/cli/groups-engine/index.js b/lib/cli/groups-engine/index.js index e523713502..b6ce10e98a 100644 --- a/lib/cli/groups-engine/index.js +++ b/lib/cli/groups-engine/index.js @@ -17,6 +17,12 @@ export const builder = function(yargs) { array: true, demand: true }) + .option('connect-sub', { + alias: 's', + describe: 'App-side ZeroMQ PUB endpoint to subscribe to.', + array: true, + demand: true + }) .epilog('Each option can be be overwritten with an environment variable ' + 'by converting the option to uppercase, replacing dashes with ' + 'underscores and prefixing it with `STF_GROUPS_ENGINE_` .)') @@ -26,6 +32,7 @@ export const handler = function(argv) { endpoints: { push: argv.connectPush, pushdev: argv.connectPushDev, + sub: argv.connectSub, } }) } diff --git a/lib/cli/local/index.js b/lib/cli/local/index.js index 2eed033d86..a5b9bb5141 100644 --- a/lib/cli/local/index.js +++ b/lib/cli/local/index.js @@ -421,6 +421,7 @@ export const handler = function(argv) { 'groups-engine', '--connect-push', argv.bindAppPull, '--connect-push-dev', argv.bindDevPull, + '--connect-sub', argv.bindAppPub, ], [ // websocket 'websocket', diff --git a/lib/db/models/all/model.js b/lib/db/models/all/model.js index ee5eb4d042..54b6b20ba6 100644 --- a/lib/db/models/all/model.js +++ b/lib/db/models/all/model.js @@ -605,29 +605,39 @@ export const unsetDeviceOwner = function(serial) { } // dbapi.setDevicePresent = function(serial) { -export const setDevicePresent = function(serial) { +export const setDevicePresent = function(serial, presenceChangedAt) { + const timestamp = presenceChangedAt ? new Date(presenceChangedAt) : getNow() + const filter = presenceChangedAt ? + {serial, $or: [{presenceChangedAt: {$lte: timestamp}}, {presenceChangedAt: {$exists: false}}]} : + {serial} + return db.devices.updateOne( - {serial: serial}, + filter, { $set: { present: true, - presenceChangedAt: getNow() + presenceChangedAt: timestamp } } ) } // dbapi.setDeviceAbsent = function(serial) { -export const setDeviceAbsent = function(serial) { +export const setDeviceAbsent = function(serial, presenceChangedAt) { + const timestamp = presenceChangedAt ? new Date(presenceChangedAt) : getNow() + const filter = presenceChangedAt ? + {serial, $or: [{presenceChangedAt: {$lte: timestamp}}, {presenceChangedAt: {$exists: false}}]} : + {serial} + return db.devices.updateOne( - {serial: serial}, + filter, { $set: { owner: null, usage: null, logs_enabled: false, present: false, - presenceChangedAt: getNow() + presenceChangedAt: timestamp } } ) diff --git a/lib/units/groups-engine/index.js b/lib/units/groups-engine/index.js index 7ccb596dd6..0542a4de74 100644 --- a/lib/units/groups-engine/index.js +++ b/lib/units/groups-engine/index.js @@ -9,14 +9,19 @@ export default (async function(options) { const { push , pushdev + , sub , channelRouter } = await db.createZMQSockets(options.endpoints, log) await db.connect() - devicesWatcher(push, pushdev, channelRouter) + devicesWatcher(push, pushdev, channelRouter, sub) - lifecycle.observe(() => - [push, pushdev].forEach((sock) => sock.close()) - ) + lifecycle.observe(() => { + push.close() + pushdev.close() + if (sub) { + sub.close() + } + }) log.info('Groups engine started') }) diff --git a/lib/units/groups-engine/watchers/devices.js b/lib/units/groups-engine/watchers/devices.js index ee6470403f..62f469dd1a 100644 --- a/lib/units/groups-engine/watchers/devices.js +++ b/lib/units/groups-engine/watchers/devices.js @@ -1,4 +1,3 @@ -import {WireRouter} from '../../../wire/router.js' import _ from 'lodash' import util from 'util' import {v4 as uuidv4} from 'uuid' @@ -8,20 +7,10 @@ import wireutil from '../../../wire/util.js' import wire from '../../../wire/index.js' import dbapi from '../../../db/api.js' import db from '../../../db/index.js' -import {LeaveGroupMessage} from '../../../wire/wire.js' -export default (function(push, pushdev, channelRouter) { +import {UngroupMessage} from '../../../wire/wire.js' +import {runTransaction} from '../../../wire/transmanager.js' +export default (function(push, pushdev, channelRouter, sub) { const log = logger.createLogger('watcher-devices') - function sendReleaseDeviceControl(serial, channel) { - push.send([ - channel, - wireutil.envelope(new wire.UngroupMessage(wireutil.toDeviceRequirements({ - serial: { - value: serial, - match: 'exact' - } - }))) - ]) - } function sendDeviceGroupChange(id, group, serial, originName) { pushdev.send([ wireutil.global, @@ -42,24 +31,21 @@ export default (function(push, pushdev, channelRouter) { wireutil.envelope(new wire.DeviceChangeMessage(publishDevice(), action, device2.group.origin, timeutil.now('nano'))) ]) } - function sendReleaseDeviceControlAndDeviceGroupChange(device, sendDeviceGroupChangeWrapper) { - let messageListener - const responseTimer = setTimeout(function() { - channelRouter.removeListener(wireutil.global, messageListener) - sendDeviceGroupChangeWrapper() - }, 5000) - messageListener = new WireRouter() - .on(LeaveGroupMessage, function(channel, message) { - if (message.serial === device.serial && - message.owner.email === device.owner.email) { - clearTimeout(responseTimer) - channelRouter.removeListener(wireutil.global, messageListener) - sendDeviceGroupChangeWrapper() - } - }) - .handler() - channelRouter.on(wireutil.global, messageListener) - sendReleaseDeviceControl(device.serial, device.channel) + async function sendReleaseDeviceControlAndDeviceGroupChange(device, sendDeviceGroupChangeWrapper) { + try { + await runTransaction(device.channel, UngroupMessage, { + requirements: wireutil.toDeviceRequirements({ + serial: { + value: device.serial, + match: 'exact' + } + }) + }, {sub, push, channelRouter, timeout: 10000}) + } + catch (/** @type {any} */ err) { + log.warn('Ungroup transaction for %s failed: %s', device.serial, err?.message) + } + sendDeviceGroupChangeWrapper() } let changeStream db.connect().then(client => { @@ -169,7 +155,7 @@ export default (function(push, pushdev, channelRouter) { if (group) { if (isDeleted) { if (oldDoc.owner) { - sendReleaseDeviceControlAndDeviceGroupChange(oldDoc, sendDeviceGroupChangeOnDeviceDeletion) + await sendReleaseDeviceControlAndDeviceGroupChange(oldDoc, sendDeviceGroupChangeOnDeviceDeletion) return } sendDeviceGroupChangeOnDeviceDeletion() @@ -185,7 +171,7 @@ export default (function(push, pushdev, channelRouter) { } if (isChangeCurrentGroup) { if (newDoc.owner && group.users.indexOf(newDoc.owner.email) < 0) { - sendReleaseDeviceControlAndDeviceGroupChange(newDoc, sendDeviceGroupChangeOnDeviceCurrentGroupUpdating) + await sendReleaseDeviceControlAndDeviceGroupChange(newDoc, sendDeviceGroupChangeOnDeviceCurrentGroupUpdating) } else { sendDeviceGroupChangeOnDeviceCurrentGroupUpdating() diff --git a/lib/units/processor/index.ts b/lib/units/processor/index.ts index 8287cd0a72..b061bd410f 100644 --- a/lib/units/processor/index.ts +++ b/lib/units/processor/index.ts @@ -150,11 +150,11 @@ export default db.ensureConnectivity(async(options: Options) => { dbapi.initializeIosDeviceState(options.publicIp, message) }) .on(DevicePresentMessage, async (channel, message, data) => { - await dbapi.setDevicePresent(message.serial) + await dbapi.setDevicePresent(message.serial, message.presenceChangedAt) appDealer.send([channel, data]) }) .on(DeviceAbsentMessage, async (channel, message, data) => { - await dbapi.setDeviceAbsent(message.serial) + await dbapi.setDeviceAbsent(message.serial, message.presenceChangedAt) appDealer.send([channel, data]) }) .on(DeviceStatusMessage, (channel, message, data) => { diff --git a/lib/units/reaper/index.ts b/lib/units/reaper/index.ts index 2fde098619..9051254763 100644 --- a/lib/units/reaper/index.ts +++ b/lib/units/reaper/index.ts @@ -66,7 +66,7 @@ export default (async(options: Options) => { log.info('Device "%s" is present', serial) push.send([ wireutil.global, - wireutil.pack(DevicePresentMessage, {serial}) + wireutil.pack(DevicePresentMessage, {serial, presenceChangedAt: Date.now()}) ]) }) @@ -74,7 +74,7 @@ export default (async(options: Options) => { log.info('Reaping device "%s" due to heartbeat timeout', serial) push.send([ wireutil.global, - wireutil.pack(DeviceAbsentMessage, {serial}) + wireutil.pack(DeviceAbsentMessage, {serial, presenceChangedAt: Date.now()}) ]) }) diff --git a/lib/units/websocket/index.js b/lib/units/websocket/index.js deleted file mode 100644 index 280fb188d9..0000000000 --- a/lib/units/websocket/index.js +++ /dev/null @@ -1,1412 +0,0 @@ -/** -* Copyright 2019 contains code contributed by Orange SA, authors: Denis Barbaron - Licensed under the Apache license 2.0 -**/ - -import http from 'http' -import util from 'util' -import Promise from 'bluebird' -import _ from 'lodash' -import postmanRequest from 'postman-request' -import {Adb} from '@u4/adbkit' -import {v4 as uuidv4} from 'uuid' -import logger from '../../util/logger.js' -import wire from '../../wire/index.js' -import wireutil from '../../wire/util.js' -import {WireRouter} from '../../wire/router.js' -import datautil from '../../util/datautil.js' -import lifecycle from '../../util/lifecycle.js' -import cookieSession from './middleware/cookie-session.js' -import ip from './middleware/remote-ip.js' -import auth from './middleware/auth.js' -import * as jwtutil from '../../util/jwtutil.js' -import * as apiutil from '../../util/apiutil.js' -import {Server} from 'socket.io' -import db from '../../db/index.js' -import EventEmitter from 'events' -import generateToken from '../api/helpers/generateToken.js' -import { - UpdateAccessTokenMessage, - DeleteUserMessage, - DeviceChangeMessage, - UserChangeMessage, - GroupChangeMessage, - DeviceGroupChangeMessage, - GroupUserChangeMessage, - DeviceLogMessage, - DeviceIntroductionMessage, - DeviceReadyMessage, - DevicePresentMessage, - DeviceAbsentMessage, - InstalledApplications, - JoinGroupMessage, - JoinGroupByAdbFingerprintMessage, - LeaveGroupMessage, - DeviceStatusMessage, - DeviceIdentityMessage, - TransactionProgressMessage, - TransactionDoneMessage, - TransactionTreeMessage, - DeviceLogcatEntryMessage, - AirplaneModeEvent, - BatteryEvent, - GetServicesAvailabilityMessage, - DeviceBrowserMessage, - ConnectivityEvent, - PhoneStateEvent, - RotationEvent, - CapabilitiesMessage, - ReverseForwardsEvent, - TemporarilyUnavailableMessage, - UpdateRemoteConnectUrl, - KeyDownMessage, - KeyUpMessage, - KeyPressMessage -} from '../../wire/wire.js' -import AllModel from '../../db/models/all/index.js' -import UserModel from '../../db/models/user/index.js' -const request = Promise.promisifyAll(postmanRequest) -export default (async function(options) { - const log = logger.createLogger('websocket') - const server = http.createServer() - - const io = new Server(server, { - serveClient: false, - transports: ['websocket'], - pingTimeout: 60000, - pingInterval: 30000 - }) - - const channelRouter = new EventEmitter() - const { - sub - , subdev - , push - , pushdev - } = await db.createZMQSockets({...options.endpoints}, log) - await db.connect({push, pushdev, channelRouter}) - - ;[wireutil.global].forEach(function(channel) { - log.info('Subscribing to permanent webosocket channel "%s"', channel) - sub.subscribe(channel) - }) - sub.on('message', function(channel, data) { - channelRouter.emit(channel.toString(), channel, data) - }) - - io.use(cookieSession({ - name: options.ssid, - keys: [options.secret] - })) - io.use(ip({ - trust: function() { - return true - } - })) - io.use(auth({secret: options.secret})) - io.on('connection', function(socket) { - const req = socket.request - - // @ts-ignore - const {user} = req - const channels = [] - - // @ts-ignore - user.ip = socket.handshake.query.uip || req.ip - socket.emit('socket.ip', user.ip) - function joinChannel(channel) { - channels.push(channel) - log.info('Subscribing to permanent websocket joinChannel channel "%s"', channel) - channelRouter.on(channel, messageListener) - sub.subscribe(channel) - } - function leaveChannel(channel) { - _.pull(channels, channel) - channelRouter.removeListener(channel, messageListener) - sub.unsubscribe(channel) - } - - const deviceIsOwned = (channel) => user?.ownedChannels?.has(channel) - - function trySendPush(args) { - if (deviceIsOwned(args[0])) { - return push.send(args) - } - } - - function createKeyHandler(Klass) { - return function(channel, data) { - if (user?.ownedChannels?.has(channel)) { - push.send([channel, wireutil.pack(Klass, { - key: data.key - })]) - } - } - } - let disconnectSocket - var messageListener = new WireRouter() - .on(UpdateAccessTokenMessage, function() { - socket.emit('user.keys.accessToken.updated') - }) - .on(DeleteUserMessage, function() { - disconnectSocket(true) - }) - .on(DeviceChangeMessage, function(channel, message) { - if (user.groups.subscribed.indexOf(message.device.group.id) > -1) { - socket.emit('device.change', { - important: true, - data: { - serial: message.device.serial, - group: message.device.group - } - }) - } - if (user.groups.subscribed.indexOf(message.device.group.origin) > -1 || - user.groups.subscribed.indexOf(message.oldOriginGroupId) > -1) { - socket.emit('user.settings.devices.' + message.action, message) - } - }) - .on(UserChangeMessage, function(channel, message) { - Promise.map(message.targets, function(target) { - socket.emit('user.' + target + '.users.' + message.action, message) - }) - }) - .on(GroupChangeMessage, function(channel, message) { - if (user.privilege === 'admin' || - user.email === message.group.owner.email || - !apiutil.isOriginGroup(message.group.class) && - (message.isChangedDates || message.isChangedClass || message.devices.length)) { - socket.emit('user.settings.groups.' + message.action, message) - } - if (message.subscribers.indexOf(user.email) > -1) { - socket.emit('user.view.groups.' + message.action, message) - } - }) - .on(DeviceGroupChangeMessage, function(channel, message) { - if (user.groups.subscribed.indexOf(message.id) > -1) { - if (user.groups.subscribed.indexOf(message.group.id) > -1) { - socket.emit('device.updateGroupDevice', { - important: true, - data: { - serial: message.serial, - group: message.group - } - }) - } - else { - socket.emit('device.removeGroupDevices', {important: true, devices: [message.serial]}) - } - } - else if (user.groups.subscribed.indexOf(message.group.id) > -1) { - socket.emit('device.addGroupDevices', {important: true, devices: [message.serial]}) - } - }) - .on(GroupUserChangeMessage, function(channel, message) { - if (message.users.indexOf(user.email) > -1) { - if (message.isAdded) { - user.groups.subscribed = _.union(user.groups.subscribed, [message.id]) - if (message.devices.length) { - socket.emit('device.addGroupDevices', {important: true, devices: message.devices}) - } - } - else { - if (message.devices.length) { - socket.emit('device.removeGroupDevices', {important: true, devices: message.devices}) - } - if (message.isDeletedLater) { - setTimeout(function() { - user.groups.subscribed = _.without(user.groups.subscribed, message.id) - }, 5000) - } - else { - user.groups.subscribed = _.without(user.groups.subscribed, message.id) - } - } - } - }) - .on(DeviceLogMessage, function(channel, message) { - io.emit('logcat.log', message) - }) - .on(DeviceIntroductionMessage, function(channel, message) { - if (message && message.group && user.groups.subscribed.indexOf(message.group.id) > -1) { - io.emit('device.add', { - important: true, - data: { - serial: message.serial, - present: true, - provider: message.provider, - owner: null, - status: message.status, - ready: false, - reverseForwards: [], - group: message.group - } - }) - } - }) - .on(DeviceReadyMessage, function(channel, message) { - io.emit('device.change', { - important: true, - data: { - serial: message.serial, - channel: message.channel, - owner: null, // @todo Get rid of need to reset this here. - ready: true, - reverseForwards: [] // @todo Get rid of need to reset this here. - } - }) - }) - .on(DevicePresentMessage, function(channel, message) { - io.emit('device.change', { - important: true, - data: { - serial: message.serial, - present: true - } - }) - }) - .on(DeviceAbsentMessage, function(channel, message) { - io.emit('device.remove', { - important: true, - data: { - serial: message.serial, - present: false, - likelyLeaveReason: 'device_absent' - } - }) - }) - .on(InstalledApplications, function(channel, message, data) { - socket.emit('device.applications', { - important: true, - data: { - serial: message.serial, - applications: message.applications - } - }) - }) - // @TODO refactore JoimGroupMessage route - .on(JoinGroupMessage, function(channel, message) { - AllModel.getInstalledApplications({serial: message.serial}) - .then(applications => { - if (!user?.ownedChannels) { - user.ownedChannels = new Set() - } - - if (message?.owner?.email === user?.email) { - user.ownedChannels.add(applications.channel) - } - else if (user.ownedChannels.has(applications.channel)) { - user.ownedChannels.delete(applications.channel) - } - - socket.emit(`device.application-${message.serial}`, { - applications: applications - }) - socket.emit('device.change', { - important: true, - data: datautil.applyOwner({ - serial: message.serial, - owner: message.owner, - likelyLeaveReason: 'owner_change', - usage: message.usage, - applications: applications - }, user) - }) - }) - .catch(err => { - socket.emit('device.change', { - important: true, - data: datautil.applyOwner({ - serial: message.serial, - owner: message.owner, - likelyLeaveReason: 'owner_change', - usage: message.usage - }, user) - }) - }) - }) - .on(JoinGroupByAdbFingerprintMessage, function(channel, message) { - socket.emit('user.keys.adb.confirm', { - title: message.comment, - fingerprint: message.fingerprint - }) - }) - .on(LeaveGroupMessage, function(channel, message) { - io.emit('device.change', { - important: true, - data: datautil.applyOwner({ - serial: message.serial, - owner: null, - likelyLeaveReason: message.reason - }, user) - }) - }) - .on(DeviceStatusMessage, function(channel, message) { - message.likelyLeaveReason = 'status_change' - io.emit('device.change', { - important: true, - data: message - }) - }) - .on(DeviceIdentityMessage, function(channel, message) { - datautil.applyData(message) - io.emit('device.change', { - important: true, - data: message - }) - }) - .on(TransactionProgressMessage, function(channel, message) { - socket.emit('tx.progress', channel.toString(), message) - }) - .on(TransactionDoneMessage, function(channel, message) { - socket.emit('tx.done', channel.toString(), message) - }) - .on(TransactionTreeMessage, function(channel, message) { - socket.emit('tx.tree', channel.toString(), message) - }) - .on(DeviceLogcatEntryMessage, function(channel, message) { - socket.emit('logcat.entry', message) - }) - .on(AirplaneModeEvent, function(channel, message) { - io.emit('device.change', { - important: true, - data: { - serial: message.serial, - airplaneMode: message.enabled - } - }) - }) - .on(BatteryEvent, function(channel, message) { - var {serial} = message - delete message.serial - io.emit('device.change', { - important: false, - data: { - serial: serial, - battery: message - } - }) - }) - .on(GetServicesAvailabilityMessage, function(channel, message) { - let serial = message.serial - delete message.serial - io.emit('device.change', { - important: true, - data: { - serial: serial, - service: message - } - }) - }) - .on(DeviceBrowserMessage, function(channel, message) { - var {serial} = message - delete message.serial - io.emit('device.change', { - important: true, - data: datautil.applyBrowsers({ - serial: serial, - browser: message - }) - }) - }) - .on(ConnectivityEvent, function(channel, message) { - var {serial} = message - delete message.serial - io.emit('device.change', { - important: false, - data: { - serial: serial, - network: message - } - }) - }) - .on(PhoneStateEvent, function(channel, message) { - var {serial} = message - delete message.serial - io.emit('device.change', { - important: false, - data: { - serial: serial, - network: message - } - }) - }) - .on(RotationEvent, function(channel, message) { - socket.emit('device.change', { - important: false, - data: { - serial: message.serial, - display: { - rotation: message.rotation - } - } - }) - }) - .on(CapabilitiesMessage, function(channel, message) { - socket.emit('device.change', { - important: false, - data: { - serial: message.serial, - capabilities: { - hasTouch: message.hasTouch, - hasCursor: message.hasCursor - } - } - }) - }) - .on(ReverseForwardsEvent, function(channel, message) { - socket.emit('device.change', { - important: false, - data: { - serial: message.serial, - reverseForwards: message.forwards - } - }) - }) - .on(TemporarilyUnavailableMessage, function(channel, message) { - socket.emit('temporarily-unavailable', { - data: { - removeConnectUrl: message.removeConnectUrl - } - }) - }) - .on(UpdateRemoteConnectUrl, function(channel, message) { - socket.emit('device.change', { - important: true, - data: { - serial: message.serial - } - }) - }) - .handler() - channelRouter.on(wireutil.global, messageListener) - // User's private group - joinChannel(user.group) - new Promise(function(resolve) { - disconnectSocket = resolve - socket.on('disconnect', resolve) - // Global messages for all clients using socket.io - // - // Device note - .on('device.note', function(data) { - return AllModel - .setDeviceNote(data.serial, data.note) - .then(function() { - return AllModel.loadDevice(user.groups.subscribed, data.serial) - }) - .then(function(device) { - if (device) { - io.emit('device.change', { - important: true, - data: { - serial: device.serial, - notes: device.notes - } - }) - } - }) - }) - // Client specific messages - // - // Settings - .on('user.settings.update', function(data) { - if (data.alertMessage === undefined) { - UserModel.updateUserSettings(user.email, data) - } - else { - UserModel.updateUserSettings(apiutil.STF_ADMIN_EMAIL, data) - } - }) - .on('user.settings.reset', function() { - UserModel.resetUserSettings(user.email) - }) - .on('user.keys.accessToken.generate', async(data) => { - const {title} = data - const token = generateToken(user, options.secret) - await AllModel - .saveUserAccessToken(user.email, { - title: title, - id: token.id, - jwt: token.jwt - }) - socket.emit('user.keys.accessToken.generated', { - title: title, - token: token.jwt - }) - }) - .on('user.keys.accessToken.remove', function(data) { - const isAdmin = user.privilege === apiutil.ADMIN - const email = (isAdmin ? data.email : null) || user.email - return AllModel - .removeUserAccessToken(email, data.title) - .then(function() { - socket.emit('user.keys.accessToken.updated') - }) - }) - .on('user.keys.adb.add', function(data) { - // @ts-ignore - return Adb.util.parsePublicKey(data.key) - .then(function(key) { - return UserModel.lookupUsersByAdbKey(key.fingerprint) - .then(function(keys) { - return keys - }) - .then(function(users) { - if (users.length) { - throw new AllModel.DuplicateSecondaryIndexError() - } - else { - return UserModel.insertUserAdbKey(user.email, { - title: data.title, - fingerprint: key.fingerprint - }) - } - }) - .then(function() { - socket.emit('user.keys.adb.added', { - title: data.title, - fingerprint: key.fingerprint - }) - }) - }) - .then(function() { - push.send([ - wireutil.global, - wireutil.envelope(new wire.AdbKeysUpdatedMessage()) - ]) - }) - .catch(function(err) { - socket.emit('user.keys.adb.error', { - message: err.message - }) - }) - }) - .on('user.keys.adb.accept', function(data) { - return UserModel.lookupUsersByAdbKey(data.fingerprint) - .then(function(keys) { - return keys - }) - .then(function(users) { - if (users.length) { - throw new AllModel.DuplicateSecondaryIndexError() - } - else { - return UserModel.insertUserAdbKey(user.email, { - title: data.title, - fingerprint: data.fingerprint - }) - } - }) - .then(function() { - socket.emit('user.keys.adb.added', { - title: data.title, - fingerprint: data.fingerprint - }) - }) - .then(function() { - push.send([ - user.group, - wireutil.envelope(new wire.AdbKeysUpdatedMessage()) - ]) - }) - // @ts-ignore - .catch(AllModel.DuplicateSecondaryIndexError, function() { - // No-op - }) - }) - .on('user.keys.adb.remove', function(data) { - return UserModel - .deleteUserAdbKey(user.email, data.fingerprint) - .then(function() { - socket.emit('user.keys.adb.removed', data) - }) - }) - .on('shell.settings.execute', function(data) { - let command = data.command - AllModel.loadDevices().then(devices => { - devices.forEach(device => { - push.send([ - device.channel, - wireutil.envelope(new wire.ShellCommandMessage({ - command: command, - timeout: 10000 - })) - ]) - }) - }) - // TODO: support response - // joinChannel(responseChannel) - // push.send([ - // channel - // , wireutil.transaction( - // responseChannel - // , new wire.ShellCommandMessage(data) - // ) - // ]) - }) - // Touch events - .on('input.touchDown', function(channel, data) { - trySendPush([ - channel, - wireutil.envelope(new wire.TouchDownMessage(data.seq, data.contact, data.x, data.y, data.pressure)) - ]) - }) - .on('input.touchMove', function(channel, data) { - try { - trySendPush([ - channel, - wireutil.envelope(new wire.TouchMoveMessage(data.seq, data.contact, data.x, data.y, data.pressure)) - ]) - } - catch (/** @type {any} */ err) { - // workaround for https://github.com/openstf/stf/issues/1180 - log.error('input.touchMove had an error', err.stack) - } - }) - .on('input.touchMoveIos', function(channel, data) { - trySendPush([ - channel, - wireutil.envelope(new wire.TouchMoveIosMessage(data.toX, data.toY, data.fromX, data.fromY, data.duration || 0)) - ]) - }) - .on('tapDeviceTreeElement', function(channel, data) { - trySendPush([ - channel, - wireutil.envelope(new wire.TapDeviceTreeElement(data.label)) - ]) - }) - .on('input.touchUp', function(channel, data) { - trySendPush([ - channel, - wireutil.envelope(new wire.TouchUpMessage(data.seq, data.contact)) - ]) - }) - .on('input.touchCommit', function(channel, data) { - trySendPush([ - channel, - wireutil.envelope(new wire.TouchCommitMessage(data.seq)) - ]) - }) - .on('input.touchReset', function(channel, data) { - trySendPush([ - channel, - wireutil.envelope(new wire.TouchResetMessage(data.seq)) - ]) - }) - .on('input.gestureStart', function(channel, data) { - trySendPush([ - channel, - wireutil.envelope(new wire.GestureStartMessage(data.seq)) - ]) - }) - .on('input.gestureStop', function(channel, data) { - trySendPush([ - channel, - wireutil.envelope(new wire.GestureStopMessage(data.seq)) - ]) - }) - // Key events - .on('input.keyDown', createKeyHandler(KeyDownMessage)) - .on('input.keyUp', createKeyHandler(KeyUpMessage)) - .on('input.keyPress', createKeyHandler(KeyPressMessage)) - .on('input.type', function(channel, data) { - trySendPush([ - channel, - wireutil.envelope(new wire.TypeMessage(data.text)) - ]) - }) - .on('display.rotate', function(channel, data) { - trySendPush([ - channel, - wireutil.envelope(new wire.RotateMessage(data.rotation)) - ]) - }) - .on('quality.change', function(channel, data) { - trySendPush([ - channel, - wireutil.envelope(new wire.ChangeQualityMessage(data.quality)) - ]) - }) - // Transactions - .on('airplane.set', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.AirplaneSetMessage(data)) - ]) - }) - .on('clipboard.paste', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.PasteMessage(data.text)) - ]) - }) - .on('clipboard.copy', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.CopyMessage()) - ]) - }) - .on('clipboard.copyIos', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.CopyMessage()) - ]) - }) - .on('device.identify', function(channel, responseChannel) { - trySendPush([ - channel, - wireutil.transaction(responseChannel, new wire.PhysicalIdentifyMessage()) - ]) - }) - .on('device.reboot', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.RebootMessage()) - ]) - }) - .on('device.rebootIos', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.RebootMessage()) - ]) - }) - .on('account.check', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.AccountCheckMessage(data)) - ]) - }) - .on('account.remove', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.AccountRemoveMessage(data)) - ]) - }) - .on('account.addmenu', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.AccountAddMenuMessage()) - ]) - }) - .on('account.add', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.AccountAddMessage(data.user, data.password)) - ]) - }) - .on('account.get', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.AccountGetMessage(data)) - ]) - }) - .on('sd.status', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.SdStatusMessage()) - ]) - }) - .on('ringer.set', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.RingerSetMessage(data.mode)) - ]) - }) - .on('ringer.get', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.RingerGetMessage()) - ]) - }) - .on('wifi.set', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.WifiSetEnabledMessage(data.enabled)) - ]) - }) - .on('wifi.get', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.WifiGetStatusMessage()) - ]) - }) - .on('bluetooth.set', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.BluetoothSetEnabledMessage(data.enabled)) - ]) - }) - .on('bluetooth.get', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.BluetoothGetStatusMessage()) - ]) - }) - .on('bluetooth.cleanBonds', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.BluetoothCleanBondedMessage()) - ]) - }) - .on('group.invite', async(channel, responseChannel, data) => { - joinChannel(responseChannel) - const keys = await UserModel.getUserAdbKeys(user.email) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.GroupMessage( - new wire.OwnerMessage(user.email, user.name, user.group), - data.timeout || null, - wireutil.toDeviceRequirements(data.requirements), - null, - keys.map(key => key.fingerprint) - )) - ]) - }) - .on('group.kick', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.UngroupMessage(wireutil.toDeviceRequirements(data.requirements))) - ]) - }) - .on('getTreeElementsIos', function(channel, responseChannel) { - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.GetIosTreeElements()) - ]) - }) - .on('tx.cleanup', function(channel) { - leaveChannel(channel) - }) - .on('tx.punch', function(channel) { - joinChannel(channel) - socket.emit('tx.punch', channel) - }) - .on('shell.command', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.ShellCommandMessage(data)) - ]) - }) - .on('shell.keepalive', function(channel, data) { - if (!deviceIsOwned(channel)) { - return - } - - push.send([ - channel, - wireutil.envelope(new wire.ShellKeepAliveMessage(data)) - ]) - }) - .on('device.install', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - const installFlags = ['-r'] - const isApi = false - joinChannel(responseChannel) - push.send([ - channel, - // @ts-ignore - wireutil.transaction(responseChannel, new wire.InstallMessage(data.href, data.launch === true, isApi, JSON.stringify(data.manifest), installFlags, req.internalJwt)) - ]) - }) - .on('device.installIos', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - const isApi = false - joinChannel(responseChannel) - push.send([ - channel, - // @ts-ignore - wireutil.transaction(responseChannel, new wire.InstallMessage(data.href, data.launch === true, isApi, JSON.stringify(data.manifest), [], req.internalJwt)) - ]) - }) - .on('device.uninstall', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.UninstallMessage(data)) - ]) - }) - .on('device.uninstallIos', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.envelope(new wire.UninstallIosMessage(data.packageName)) - ]) - }) - .on('device.launchApp', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.LaunchDeviceApp(data.pkg)) - ]) - }) - .on('device.getApps', _.debounce(async(channel, responseChannel) => { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.GetInstalledApplications()) - ]) - }, 500)) - .on('app.kill', async(channel, responseChannel, data) => { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction( - responseChannel, - data?.force ? - new wire.KillDeviceApp() : - new wire.TerminateDeviceApp() - ) - ]) - }) - .on('app.getAssetList', async(channel, responseChannel) => { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.GetAppAssetsList()) - ]) - }) - .on('app.getAsset', async(channel, responseChannel, data) => { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.GetAppAsset(data.url)) - ]) - }) - .on('app.getAppHTML', async(channel, responseChannel) => { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.GetAppHTML()) - ]) - }) - .on('app.getInspectServerUrl', async(channel, responseChannel) => { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.GetAppInspectServerUrl()) - ]) - }) - .on('device.unlockDevice', function(channel, data) { - if (!deviceIsOwned(channel)) { - return - } - - push.send([ - channel, - wireutil.envelope(new wire.UnlockDeviceMessage()) - ]) - }) - .on('storage.upload', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - request - .postAsync({ - url: util.format('%sapi/v1/resources?channel=%s', options.storageUrl, responseChannel), - json: true, - body: { - url: data.url - } - }) - .catch(function(err) { - log.error('Storage upload had an error', err.stack) - leaveChannel(responseChannel) - socket.emit('tx.cancel', responseChannel, { - success: false, - data: 'fail_upload' - }) - }) - }) - .on('forward.test', function(channel, responseChannel, data) { - joinChannel(responseChannel) - if (!data.targetHost || data.targetHost === 'localhost') { - data.targetHost = user.ip - } - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.ForwardTestMessage(data)) - ]) - }) - .on('forward.create', function(channel, responseChannel, data) { - if (!data.targetHost || data.targetHost === 'localhost') { - data.targetHost = user.ip - } - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.ForwardCreateMessage(data)) - ]) - }) - .on('forward.remove', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.ForwardRemoveMessage(data)) - ]) - }) - .on('logcat.start', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - // #455 and #459 - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.LogcatStartMessage(data)) - ]) - }) - .on('logcat.startIos', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.LogcatStartMessage(data)) - ]) - }) - .on('logcat.stop', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.LogcatStopMessage()) - ]) - }) - .on('logcat.stopIos', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.LogcatStopMessage()) - ]) - }) - .on('connect.start', function(channel, responseChannel) { - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.ConnectStartMessage()) - ]) - }) - .on('connect.startIos', function(channel, responseChannel) { - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.ConnectStartMessage()) - ]) - }) - .on('connect.stop', function(channel, responseChannel) { - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.ConnectStopMessage()) - ]) - }) - .on('browser.open', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.BrowserOpenMessage(data)) - ]) - }) - .on('browser.openIos', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.BrowserOpenMessage(data)) - ]) - }) - .on('browser.clear', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.BrowserClearMessage(data)) - ]) - }) - .on('store.open', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.StoreOpenMessage()) - ]) - }) - .on('store.openIos', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.StoreOpenMessage()) - ]) - }) - .on('settings.open', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - push.send([ - channel, - wireutil.envelope(new wire.DashboardOpenMessage()) - ]) - }) - .on('screen.capture', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.ScreenCaptureMessage()) - ]) - }) - .on('screen.captureIos', function(channel, responseChannel) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.ScreenCaptureMessage()) - ]) - }) - .on('fs.retrieve', function(channel, responseChannel, data) { - joinChannel(responseChannel) - push.send([ - channel, - // @ts-ignore - wireutil.transaction(responseChannel, new wire.FileSystemGetMessage(data.file, req.internalJwt)) - ]) - }) - .on('fs.list', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.FileSystemListMessage(data)) - ]) - }) - .on('fs.listIos', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - wireutil.transaction(responseChannel, new wire.FileSystemListMessage(data)) - ]) - }) - .on('fs.retrieveIos', function(channel, responseChannel, data) { - if (!deviceIsOwned(channel)) { - return - } - - joinChannel(responseChannel) - push.send([ - channel, - // @ts-ignore - wireutil.transaction(responseChannel, new wire.FileSystemGetMessage(data.file, req.internalJwt)) - ]) - }) - .on('policy.accept', function(data) { - UserModel.acceptPolicy(user.email) - }) - }) - .finally(function() { - // Clean up all listeners and subscriptions - channelRouter.removeListener(wireutil.global, messageListener) - channels.forEach(function(channel) { - channelRouter.removeListener(channel, messageListener) - sub.unsubscribe(channel) - }) - socket.disconnect(true) - }) - .catch(function(err) { - log.error('Client had an error, disconnecting due to probable loss of integrity', err.stack) - }) - }) - lifecycle.observe(function() { - [push, pushdev, sub, subdev].forEach(function(sock) { - try { - sock.close() - } - catch (err) { - // No-op - } - }) - }) - server.listen(options.port) - log.info('Listening on port websockets %d', options.port) -}) diff --git a/lib/units/websocket/index.ts b/lib/units/websocket/index.ts new file mode 100644 index 0000000000..55c072050f --- /dev/null +++ b/lib/units/websocket/index.ts @@ -0,0 +1,1604 @@ +/** +* Copyright 2019 contains code contributed by Orange SA, authors: Denis Barbaron - Licensed under the Apache license 2.0 +**/ + +import crypto from 'node:crypto' +import http from 'http' +import _ from 'lodash' +import {Adb} from '@u4/adbkit' +import logger from '../../util/logger.js' +import wireutil from '../../wire/util.js' +import {WireRouter} from '../../wire/router.js' +import datautil from '../../util/datautil.js' +import lifecycle from '../../util/lifecycle.js' +import cookieSession from './middleware/cookie-session.js' +import ip from './middleware/remote-ip.js' +import auth from './middleware/auth.js' +import * as apiutil from '../../util/apiutil.js' +import {Server} from 'socket.io' +import db from '../../db/index.js' +import EventEmitter from 'events' +import generateToken from '../api/helpers/generateToken.js' +import { + UpdateAccessTokenMessage, + DeleteUserMessage, + DeviceChangeMessage, + UserChangeMessage, + GroupChangeMessage, + DeviceGroupChangeMessage, + GroupUserChangeMessage, + DeviceLogMessage, + DeviceIntroductionMessage, + DeviceReadyMessage, + DevicePresentMessage, + DeviceAbsentMessage, + InstalledApplications, + JoinGroupMessage, + JoinGroupByAdbFingerprintMessage, + LeaveGroupMessage, + DeviceStatusMessage, + DeviceIdentityMessage, + TransactionProgressMessage, + TransactionDoneMessage, + TransactionTreeMessage, + DeviceLogcatEntryMessage, + AirplaneModeEvent, + BatteryEvent, + GetServicesAvailabilityMessage, + DeviceBrowserMessage, + ConnectivityEvent, + PhoneStateEvent, + RotationEvent, + CapabilitiesMessage, + ReverseForwardsEvent, + TemporarilyUnavailableMessage, + UpdateRemoteConnectUrl, + KeyDownMessage, + KeyUpMessage, + KeyPressMessage, + TouchDownMessage, + TouchMoveMessage, + TouchMoveIosMessage, + TouchUpMessage, + TouchCommitMessage, + TouchResetMessage, + GestureStartMessage, + GestureStopMessage, + TypeMessage, + TapDeviceTreeElement, + RotateMessage, + ChangeQualityMessage, + AdbKeysUpdatedMessage, + ShellCommandMessage, + ShellKeepAliveMessage, + UninstallIosMessage, + UnlockDeviceMessage, + DashboardOpenMessage, + AirplaneSetMessage, + PasteMessage, + CopyMessage, + PhysicalIdentifyMessage, + RebootMessage, + AccountCheckMessage, + AccountRemoveMessage, + AccountAddMenuMessage, + AccountAddMessage, + AccountGetMessage, + SdStatusMessage, + RingerSetMessage, + RingerGetMessage, + WifiSetEnabledMessage, + WifiGetStatusMessage, + BluetoothSetEnabledMessage, + BluetoothGetStatusMessage, + BluetoothCleanBondedMessage, + GroupMessage, + UngroupMessage, + GetIosTreeElements, + InstallMessage, + UninstallMessage, + LaunchDeviceApp, + GetInstalledApplications, + KillDeviceApp, + TerminateDeviceApp, + GetAppAssetsList, + GetAppAsset, + GetAppHTML, + GetAppInspectServerUrl, + ForwardTestMessage, + ForwardCreateMessage, + ForwardRemoveMessage, + LogcatStartMessage, + LogcatStopMessage, + ConnectStartMessage, + ConnectStopMessage, + BrowserOpenMessage, + BrowserClearMessage, + StoreOpenMessage, + ScreenCaptureMessage, + FileSystemGetMessage, + FileSystemListMessage +} from '../../wire/wire.js' +import AllModel from '../../db/models/all/index.js' +import UserModel from '../../db/models/user/index.js' +import type {MessageType} from '@protobuf-ts/runtime' + +interface Options { + port: number + secret: string + ssid: string + storageUrl: string + endpoints: { + sub: string[] + push: string[] + subdev: string[] + pushdev: string[] + } +} + +export default (async (options: Options) => { + const log = logger.createLogger('websocket') + const server = http.createServer() + + const io = new Server(server, { + serveClient: false, + transports: ['websocket'], + pingTimeout: 60000, + pingInterval: 30000 + }) + + const channelRouter = new EventEmitter() + const zmqSockets = await db.createZMQSockets({...options.endpoints}, log) + const sub = zmqSockets.sub! + const subdev = zmqSockets.subdev! + const push = zmqSockets.push! + const pushdev = zmqSockets.pushdev! + await db.connect({push, pushdev, channelRouter}) + + ;[wireutil.global].forEach((channel) => { + log.info('Subscribing to permanent webosocket channel "%s"', channel) + sub.subscribe(channel) + }) + sub.on('message', (channel: any, data: any) => { + channelRouter.emit(channel.toString(), channel, data) + }) + + io.use(cookieSession({ + name: options.ssid, + keys: [options.secret] + })) + io.use(ip({ + trust: () => true + })) + io.use(auth({secret: options.secret})) + + io.on('connection', (socket) => { + const req = socket.request as any + const user = req.user + const channels: string[] = [] + + user.ip = socket.handshake.query.uip || req.ip + socket.emit('socket.ip', user.ip) + + const joinChannel = (channel: string) => { + channels.push(channel) + log.info('Subscribing to permanent websocket joinChannel channel "%s"', channel) + channelRouter.on(channel, messageListener) + sub.subscribe(channel) + } + + const leaveChannel = (channel: string) => { + const idx = channels.indexOf(channel) + if (idx !== -1) channels.splice(idx, 1) + channelRouter.removeListener(channel, messageListener) + sub.unsubscribe(channel) + } + + const deviceIsOwned = (channel: string) => user?.ownedChannels?.has(channel) + + const serialToChannel = (serial: string) => + crypto.createHash('sha1').update(serial).digest('base64') + + const trySendPush = (args: any[]) => { + if (deviceIsOwned(args[0])) { + return push.send(args) + } + } + + const createKeyHandler = (Klass: MessageType) => + (channel: string, data: any) => { + try { + if (deviceIsOwned(channel)) { + push.send([channel, wireutil.pack(Klass, { + key: data.key + })]) + } + } + catch {} + } + + let disconnectSocket!: (value?: any) => void + + const messageListener = new WireRouter() + .on(UpdateAccessTokenMessage, () => { + socket.emit('user.keys.accessToken.updated') + }) + .on(DeleteUserMessage, () => { + disconnectSocket(true) + }) + .on(DeviceChangeMessage, (channel: string, message: any) => { + if (user.groups.subscribed.indexOf(message.device.group.id) > -1) { + socket.emit('device.change', { + important: true, + data: { + serial: message.device.serial, + group: message.device.group + } + }) + } + if (user.groups.subscribed.indexOf(message.device.group.origin) > -1 || + user.groups.subscribed.indexOf(message.oldOriginGroupId) > -1) { + socket.emit('user.settings.devices.' + message.action, message) + } + }) + .on(UserChangeMessage, (channel: string, message: any) => { + message.targets.forEach((target: any) => { + socket.emit('user.' + target + '.users.' + message.action, message) + }) + }) + .on(GroupChangeMessage, (channel: string, message: any) => { + if (user.privilege === 'admin' || + user.email === message.group.owner.email || + !apiutil.isOriginGroup(message.group.class) && + (message.isChangedDates || message.isChangedClass || message.devices.length)) { + socket.emit('user.settings.groups.' + message.action, message) + } + if (message.subscribers.indexOf(user.email) > -1) { + socket.emit('user.view.groups.' + message.action, message) + } + }) + .on(DeviceGroupChangeMessage, (channel: string, message: any) => { + if (user.groups.subscribed.indexOf(message.id) > -1) { + if (user.groups.subscribed.indexOf(message.group.id) > -1) { + socket.emit('device.updateGroupDevice', { + important: true, + data: { + serial: message.serial, + group: message.group + } + }) + } + else { + socket.emit('device.removeGroupDevices', {important: true, devices: [message.serial]}) + } + } + else if (user.groups.subscribed.indexOf(message.group.id) > -1) { + socket.emit('device.addGroupDevices', {important: true, devices: [message.serial]}) + } + }) + .on(GroupUserChangeMessage, (channel: string, message: any) => { + if (message.users.indexOf(user.email) > -1) { + if (message.isAdded) { + user.groups.subscribed = [...new Set([...user.groups.subscribed, message.id])] + if (message.devices.length) { + socket.emit('device.addGroupDevices', {important: true, devices: message.devices}) + } + } + else { + if (message.devices.length) { + socket.emit('device.removeGroupDevices', {important: true, devices: message.devices}) + } + if (message.isDeletedLater) { + setTimeout(() => { + user.groups.subscribed = user.groups.subscribed.filter((v: string) => v !== message.id) + }, 5000) + } + else { + user.groups.subscribed = user.groups.subscribed.filter((v: string) => v !== message.id) + } + } + } + }) + .on(DeviceLogMessage, (channel: string, message: any) => { + io.emit('logcat.log', message) + }) + .on(DeviceIntroductionMessage, (channel: string, message: any) => { + if (message && message.group && user.groups.subscribed.indexOf(message.group.id) > -1) { + io.emit('device.add', { + important: true, + data: { + serial: message.serial, + present: true, + provider: message.provider, + owner: null, + status: message.status, + ready: false, + reverseForwards: [], + group: message.group + } + }) + } + }) + .on(DeviceReadyMessage, (channel: string, message: any) => { + io.emit('device.change', { + important: true, + data: { + serial: message.serial, + channel: message.channel, + owner: null, // @todo Get rid of need to reset this here. + ready: true, + reverseForwards: [], // @todo Get rid of need to reset this here. + } + }) + }) + .on(DevicePresentMessage, (channel: string, message: any) => { + io.emit('device.change', { + important: true, + data: { + serial: message.serial, + present: true + } + }) + }) + .on(DeviceAbsentMessage, (channel: string, message: any) => { + user?.ownedChannels?.delete(serialToChannel(message.serial)) + + io.emit('device.remove', { + important: true, + data: { + serial: message.serial, + present: false, + likelyLeaveReason: 'device_absent' + } + }) + }) + .on(InstalledApplications, (channel: string, message: any) => { + socket.emit('device.applications', { + important: true, + data: { + serial: message.serial, + applications: message.applications + } + }) + }) + // @TODO refactor JoinGroupMessage route + .on(JoinGroupMessage, (channel: string, message: any) => { + if (!user?.ownedChannels) { + user.ownedChannels = new Set() + } + + user.ownedChannels.add(serialToChannel(message.serial)) + + AllModel.getInstalledApplications({serial: message.serial}) + .then((applications: any) => { + socket.emit(`device.application-${message.serial}`, { + applications + }) + socket.emit('device.change', { + important: true, + data: datautil.applyOwner({ + serial: message.serial, + owner: message.owner, + likelyLeaveReason: 'owner_change', + usage: message.usage, + applications + }, user) + }) + }) + .catch(() => { + socket.emit('device.change', { + important: true, + data: datautil.applyOwner({ + serial: message.serial, + owner: message.owner, + likelyLeaveReason: 'owner_change', + usage: message.usage + }, user) + }) + }) + }) + .on(JoinGroupByAdbFingerprintMessage, (channel: string, message: any) => { + socket.emit('user.keys.adb.confirm', { + title: message.comment, + fingerprint: message.fingerprint + }) + }) + .on(LeaveGroupMessage, (channel: string, message: any) => { + user?.ownedChannels?.delete(serialToChannel(message.serial)) + + io.emit('device.change', { + important: true, + data: datautil.applyOwner({ + serial: message.serial, + owner: null, + likelyLeaveReason: message.reason + }, user) + }) + }) + .on(DeviceStatusMessage, (channel: string, message: any) => { + message.likelyLeaveReason = 'status_change' + io.emit('device.change', { + important: true, + data: message + }) + }) + .on(DeviceIdentityMessage, (channel: string, message: any) => { + datautil.applyData(message) + io.emit('device.change', { + important: true, + data: message + }) + }) + .on(TransactionProgressMessage, (channel: string, message: any) => { + socket.emit('tx.progress', channel.toString(), message) + }) + .on(TransactionDoneMessage, (channel: string, message: any) => { + socket.emit('tx.done', channel.toString(), message) + }) + .on(TransactionTreeMessage, (channel: string, message: any) => { + socket.emit('tx.tree', channel.toString(), message) + }) + .on(DeviceLogcatEntryMessage, (channel: string, message: any) => { + socket.emit('logcat.entry', message) + }) + .on(AirplaneModeEvent, (channel: string, message: any) => { + io.emit('device.change', { + important: true, + data: { + serial: message.serial, + airplaneMode: message.enabled + } + }) + }) + .on(BatteryEvent, (channel: string, message: any) => { + const {serial} = message + delete message.serial + io.emit('device.change', { + important: false, + data: { + serial, + battery: message + } + }) + }) + .on(GetServicesAvailabilityMessage, (channel: string, message: any) => { + const serial = message.serial + delete message.serial + io.emit('device.change', { + important: true, + data: { + serial, + service: message + } + }) + }) + .on(DeviceBrowserMessage, (channel: string, message: any) => { + const {serial} = message + delete message.serial + io.emit('device.change', { + important: true, + data: datautil.applyBrowsers({ + serial, + browser: message + }) + }) + }) + .on(ConnectivityEvent, (channel: string, message: any) => { + const {serial} = message + delete message.serial + io.emit('device.change', { + important: false, + data: { + serial, + network: message + } + }) + }) + .on(PhoneStateEvent, (channel: string, message: any) => { + const {serial} = message + delete message.serial + io.emit('device.change', { + important: false, + data: { + serial, + network: message + } + }) + }) + .on(RotationEvent, (channel: string, message: any) => { + socket.emit('device.change', { + important: false, + data: { + serial: message.serial, + display: { + rotation: message.rotation + } + } + }) + }) + .on(CapabilitiesMessage, (channel: string, message: any) => { + socket.emit('device.change', { + important: false, + data: { + serial: message.serial, + capabilities: { + hasTouch: message.hasTouch, + hasCursor: message.hasCursor + } + } + }) + }) + .on(ReverseForwardsEvent, (channel: string, message: any) => { + socket.emit('device.change', { + important: false, + data: { + serial: message.serial, + reverseForwards: message.forwards + } + }) + }) + .on(TemporarilyUnavailableMessage, (channel: string, message: any) => { + socket.emit('temporarily-unavailable', { + data: { + removeConnectUrl: message.removeConnectUrl + } + }) + }) + .on(UpdateRemoteConnectUrl, (channel: string, message: any) => { + socket.emit('device.change', { + important: true, + data: { + serial: message.serial + } + }) + }) + .handler() + + channelRouter.on(wireutil.global, messageListener) + joinChannel(user.group) + + new Promise((resolve) => { + disconnectSocket = resolve + socket.on('disconnect', () => resolve()) + + socket.on('device.note', async (data: any) => { + await AllModel.setDeviceNote(data.serial, data.note) + const device = await AllModel.loadDevice(user.groups.subscribed, data.serial) + if (device) { + io.emit('device.change', { + important: true, + data: { + serial: device.serial, + notes: device.notes + } + }) + } + }) + + socket.on('user.settings.update', (data: any) => { + if (data.alertMessage === undefined) { + UserModel.updateUserSettings(user.email, data) + } + else { + UserModel.updateUserSettings(apiutil.STF_ADMIN_EMAIL, data) + } + }) + + socket.on('user.settings.reset', () => { + UserModel.resetUserSettings(user.email) + }) + + socket.on('user.keys.accessToken.generate', async (data: any) => { + const {title} = data + const token = generateToken(user, options.secret) + await AllModel.saveUserAccessToken(user.email, { + title, + id: token.id, + jwt: token.jwt + }) + socket.emit('user.keys.accessToken.generated', { + title, + token: token.jwt + }) + }) + + socket.on('user.keys.accessToken.remove', async (data: any) => { + const isAdmin = user.privilege === apiutil.ADMIN + const email = (isAdmin ? data.email : null) || user.email + await AllModel.removeUserAccessToken(email, data.title) + socket.emit('user.keys.accessToken.updated') + }) + + socket.on('user.keys.adb.add', async (data: any) => { + try { + const key = await Adb.util.parsePublicKey(data.key) + const users = await UserModel.lookupUsersByAdbKey(key.fingerprint) + if (users.length) { + throw new AllModel.DuplicateSecondaryIndexError() + } + await UserModel.insertUserAdbKey(user.email, { + title: data.title, + fingerprint: key.fingerprint + }) + socket.emit('user.keys.adb.added', { + title: data.title, + fingerprint: key.fingerprint + }) + push.send([ + wireutil.global, + wireutil.pack(AdbKeysUpdatedMessage, {}) + ]) + } + catch (err: any) { + socket.emit('user.keys.adb.error', { + message: err.message + }) + } + }) + + socket.on('user.keys.adb.accept', async (data: any) => { + try { + const users = await UserModel.lookupUsersByAdbKey(data.fingerprint) + if (users.length) { + throw new AllModel.DuplicateSecondaryIndexError() + } + await UserModel.insertUserAdbKey(user.email, { + title: data.title, + fingerprint: data.fingerprint + }) + socket.emit('user.keys.adb.added', { + title: data.title, + fingerprint: data.fingerprint + }) + push.send([ + user.group, + wireutil.pack(AdbKeysUpdatedMessage, {}) + ]) + } + catch (err: any) { + if (!(err instanceof AllModel.DuplicateSecondaryIndexError)) throw err + } + }) + + socket.on('user.keys.adb.remove', async (data: any) => { + await UserModel.deleteUserAdbKey(user.email, data.fingerprint) + socket.emit('user.keys.adb.removed', data) + }) + + socket.on('shell.settings.execute', async (data: any) => { + if (user.privilege !== apiutil.ADMIN) { + return + } + + const {command} = data + const devices = await AllModel.loadDevices() + devices.forEach((device: any) => { + push.send([ + device.channel, + wireutil.pack(ShellCommandMessage, {command, timeout: 10000}) + ]) + }) + }) + + // Touch events + socket.on('input.touchDown', (channel: string, data: any) => { + try { + trySendPush([ + channel, + wireutil.pack(TouchDownMessage, { + seq: data.seq, + contact: data.contact, + x: data.x, + y: data.y, + pressure: data.pressure + }) + ]) + } + catch {} + }) + + socket.on('input.touchMove', (channel: string, data: any) => { + try { + trySendPush([ + channel, + wireutil.pack(TouchMoveMessage, { + seq: data.seq, + contact: data.contact, + x: data.x, + y: data.y, + pressure: data.pressure + }) + ]) + } + catch {} + }) + + socket.on('input.touchMoveIos', (channel: string, data: any) => { + try { + trySendPush([ + channel, + wireutil.pack(TouchMoveIosMessage, { + toX: data.toX, + toY: data.toY, + fromX: data.fromX, + fromY: data.fromY, + duration: data.duration || 0 + }) + ]) + } + catch {} + }) + + socket.on('tapDeviceTreeElement', (channel: string, data: any) => { + try { + trySendPush([ + channel, + wireutil.pack(TapDeviceTreeElement, { + label: data.label + }) + ]) + } + catch {} + }) + + socket.on('input.touchUp', (channel: string, data: any) => { + try { + trySendPush([ + channel, + wireutil.pack(TouchUpMessage, { + seq: data.seq, + contact: data.contact + }) + ]) + } + catch {} + }) + + socket.on('input.touchCommit', (channel: string, data: any) => { + try { + trySendPush([ + channel, + wireutil.pack(TouchCommitMessage, { + seq: data.seq + }) + ]) + } + catch {} + }) + + socket.on('input.touchReset', (channel: string, data: any) => { + try { + trySendPush([ + channel, + wireutil.pack(TouchResetMessage, { + seq: data.seq + }) + ]) + } + catch {} + }) + + socket.on('input.gestureStart', (channel: string, data: any) => { + try { + trySendPush([ + channel, + wireutil.pack(GestureStartMessage, { + seq: data.seq + }) + ]) + } + catch {} + }) + + socket.on('input.gestureStop', (channel: string, data: any) => { + try { + trySendPush([ + channel, + wireutil.pack(GestureStopMessage, { + seq: data.seq + }) + ]) + } + catch {} + }) + + // Key events + socket.on('input.keyDown', createKeyHandler(KeyDownMessage)) + socket.on('input.keyUp', createKeyHandler(KeyUpMessage)) + socket.on('input.keyPress', createKeyHandler(KeyPressMessage)) + + socket.on('input.type', (channel: string, data: any) => { + try { + trySendPush([ + channel, + wireutil.pack(TypeMessage, { + text: data.text + }) + ]) + } + catch {} + }) + + socket.on('display.rotate', (channel: string, data: any) => { + try { + trySendPush([ + channel, + wireutil.pack(RotateMessage, { + rotation: data.rotation + }) + ]) + } + catch {} + }) + + socket.on('quality.change', (channel: string, data: any) => { + try { + trySendPush([ + channel, + wireutil.pack(ChangeQualityMessage, { + quality: data.quality + }) + ]) + } + catch {} + }) + + // Transactions + socket.on('airplane.set', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, AirplaneSetMessage, {enabled: data.enabled}) + ]) + }) + + socket.on('clipboard.paste', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, PasteMessage, {text: data.text}) + ]) + }) + + socket.on('clipboard.copy', (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, CopyMessage, {}) + ]) + }) + + socket.on('clipboard.copyIos', (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, CopyMessage, {}) + ]) + }) + + socket.on('device.identify', (channel: string, responseChannel: string) => { + trySendPush([ + channel, + wireutil.tr(responseChannel, PhysicalIdentifyMessage, {}) + ]) + }) + + socket.on('device.reboot', (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, RebootMessage, {}) + ]) + }) + + socket.on('device.rebootIos', (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, RebootMessage, {}) + ]) + }) + + socket.on('account.check', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, AccountCheckMessage, {type: data.type, account: data.account}) + ]) + }) + + socket.on('account.remove', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, AccountRemoveMessage, {type: data.type, account: data.account}) + ]) + }) + + socket.on('account.addmenu', (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, AccountAddMenuMessage, {}) + ]) + }) + + socket.on('account.add', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, AccountAddMessage, {user: data.user, password: data.password}) + ]) + }) + + socket.on('account.get', (channel: string, responseChannel: string, data: any) => { + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, AccountGetMessage, {type: data.type}) + ]) + }) + + socket.on('sd.status', (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, SdStatusMessage, {}) + ]) + }) + + socket.on('ringer.set', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, RingerSetMessage, {mode: data.mode}) + ]) + }) + + socket.on('ringer.get', (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, RingerGetMessage, {}) + ]) + }) + + socket.on('wifi.set', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, WifiSetEnabledMessage, {enabled: data.enabled}) + ]) + }) + + socket.on('wifi.get', (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, WifiGetStatusMessage, {}) + ]) + }) + + socket.on('bluetooth.set', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, BluetoothSetEnabledMessage, {enabled: data.enabled}) + ]) + }) + + socket.on('bluetooth.get', (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, BluetoothGetStatusMessage, {}) + ]) + }) + + socket.on('bluetooth.cleanBonds', (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, BluetoothCleanBondedMessage, {}) + ]) + }) + + socket.on('group.invite', async (channel: string, responseChannel: string, data: any) => { + joinChannel(responseChannel) + const keys = await UserModel.getUserAdbKeys(user.email) + push.send([ + channel, + wireutil.tr(responseChannel, GroupMessage, { + owner: {email: user.email, name: user.name, group: user.group}, + timeout: data.timeout || undefined, + requirements: wireutil.toDeviceRequirements(data.requirements), + keys: keys.map((key: any) => key.fingerprint) + }) + ]) + }) + + socket.on('group.kick', (channel: string, responseChannel: string, data: any) => { + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, UngroupMessage, { + requirements: wireutil.toDeviceRequirements(data.requirements) + }) + ]) + }) + + socket.on('getTreeElementsIos', (channel: string, responseChannel: string) => { + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, GetIosTreeElements, {}) + ]) + }) + + socket.on('tx.cleanup', (channel: string) => { + leaveChannel(channel) + }) + + socket.on('tx.punch', (channel: string) => { + joinChannel(channel) + socket.emit('tx.punch', channel) + }) + + socket.on('shell.command', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, ShellCommandMessage, {command: data.command, timeout: data.timeout}) + ]) + }) + + socket.on('shell.keepalive', (channel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + push.send([ + channel, + wireutil.pack(ShellKeepAliveMessage, {timeout: data.timeout}) + ]) + }) + + socket.on('device.install', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + const installFlags = ['-r'] + const isApi = false + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, InstallMessage, { + href: data.href, + launch: data.launch === true, + isApi, + manifest: JSON.stringify(data.manifest), + installFlags, + jwt: req.internalJwt + }) + ]) + }) + + socket.on('device.installIos', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + const isApi = false + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, InstallMessage, { + href: data.href, + launch: data.launch === true, + isApi, + manifest: JSON.stringify(data.manifest), + installFlags: [], + jwt: req.internalJwt + }) + ]) + }) + + socket.on('device.uninstall', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, UninstallMessage, {packageName: data.packageName}) + ]) + }) + + socket.on('device.uninstallIos', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.pack(UninstallIosMessage, {packageName: data.packageName}) + ]) + }) + + socket.on('device.launchApp', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, LaunchDeviceApp, {pkg: data.pkg}) + ]) + }) + + socket.on('device.getApps', _.debounce(async (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, GetInstalledApplications, {}) + ]) + }, 500)) + + socket.on('app.kill', async (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + data?.force + ? wireutil.tr(responseChannel, KillDeviceApp, {}) + : wireutil.tr(responseChannel, TerminateDeviceApp, {}) + ]) + }) + + socket.on('app.getAssetList', async (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, GetAppAssetsList, {}) + ]) + }) + + socket.on('app.getAsset', async (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, GetAppAsset, {url: data.url}) + ]) + }) + + socket.on('app.getAppHTML', async (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, GetAppHTML, {}) + ]) + }) + + socket.on('app.getInspectServerUrl', async (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, GetAppInspectServerUrl, {}) + ]) + }) + + socket.on('device.unlockDevice', (channel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + push.send([ + channel, + wireutil.pack(UnlockDeviceMessage, {}) + ]) + }) + + socket.on('storage.upload', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + fetch(`${options.storageUrl}api/v1/resources?channel=${responseChannel}`, { + method: 'POST', + headers: {'Content-Type': 'application/json'}, + body: JSON.stringify({url: data.url}) + }) + .catch((err: any) => { + log.error('Storage upload had an error: %s', err.stack) + leaveChannel(responseChannel) + socket.emit('tx.cancel', responseChannel, { + success: false, + data: 'fail_upload' + }) + }) + }) + + socket.on('forward.test', (channel: string, responseChannel: string, data: any) => { + joinChannel(responseChannel) + if (!data.targetHost || data.targetHost === 'localhost') { + data.targetHost = user.ip + } + push.send([ + channel, + wireutil.tr(responseChannel, ForwardTestMessage, { + targetHost: data.targetHost, + targetPort: data.targetPort + }) + ]) + }) + + socket.on('forward.create', (channel: string, responseChannel: string, data: any) => { + if (!data.targetHost || data.targetHost === 'localhost') { + data.targetHost = user.ip + } + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, ForwardCreateMessage, { + id: data.id, + devicePort: data.devicePort, + targetHost: data.targetHost, + targetPort: data.targetPort + }) + ]) + }) + + socket.on('forward.remove', (channel: string, responseChannel: string, data: any) => { + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, ForwardRemoveMessage, {id: data.id}) + ]) + }) + + socket.on('logcat.start', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, LogcatStartMessage, {filters: data.filters}) + ]) + }) + + socket.on('logcat.startIos', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, LogcatStartMessage, {filters: data.filters}) + ]) + }) + + socket.on('logcat.stop', (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, LogcatStopMessage, {}) + ]) + }) + + socket.on('logcat.stopIos', (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, LogcatStopMessage, {}) + ]) + }) + + socket.on('connect.start', (channel: string, responseChannel: string) => { + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, ConnectStartMessage, {}) + ]) + }) + + socket.on('connect.startIos', (channel: string, responseChannel: string) => { + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, ConnectStartMessage, {}) + ]) + }) + + socket.on('connect.stop', (channel: string, responseChannel: string) => { + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, ConnectStopMessage, {}) + ]) + }) + + socket.on('browser.open', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, BrowserOpenMessage, {url: data.url, browser: data.browser}) + ]) + }) + + socket.on('browser.openIos', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, BrowserOpenMessage, {url: data.url, browser: data.browser}) + ]) + }) + + socket.on('browser.clear', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, BrowserClearMessage, {browser: data.browser}) + ]) + }) + + socket.on('store.open', (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, StoreOpenMessage, {}) + ]) + }) + + socket.on('store.openIos', (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, StoreOpenMessage, {}) + ]) + }) + + socket.on('settings.open', (channel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + push.send([ + channel, + wireutil.pack(DashboardOpenMessage, {}) + ]) + }) + + socket.on('screen.capture', (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, ScreenCaptureMessage, {} as any) + ]) + }) + + socket.on('screen.captureIos', (channel: string, responseChannel: string) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, ScreenCaptureMessage, {} as any) + ]) + }) + + socket.on('fs.retrieve', (channel: string, responseChannel: string, data: any) => { + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, FileSystemGetMessage, {file: data.file, jwt: req.internalJwt}) + ]) + }) + + socket.on('fs.list', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, FileSystemListMessage, {dir: data.dir}) + ]) + }) + + socket.on('fs.listIos', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, FileSystemListMessage, {dir: data.dir}) + ]) + }) + + socket.on('fs.retrieveIos', (channel: string, responseChannel: string, data: any) => { + if (!deviceIsOwned(channel)) { + return + } + + joinChannel(responseChannel) + push.send([ + channel, + wireutil.tr(responseChannel, FileSystemGetMessage, {file: data.file, jwt: req.internalJwt}) + ]) + }) + + socket.on('policy.accept', () => { + UserModel.acceptPolicy(user.email) + }) + }) + .finally(() => { + channelRouter.removeListener(wireutil.global, messageListener) + channels.forEach((channel) => { + channelRouter.removeListener(channel, messageListener) + sub.unsubscribe(channel) + }) + socket.disconnect(true) + }) + .catch((err: any) => { + log.error('Client had an error, disconnecting due to probable loss of integrity: %s', err.stack) + }) + }) + + lifecycle.observe(() => { + [push, pushdev, sub, subdev].forEach((sock) => { + try { + sock.close() + } + catch (err) { + // No-op + } + }) + }) + + server.listen(options.port) + log.info('Listening on port websockets %s', options.port) +}) diff --git a/lib/units/websocket/middleware/auth.js b/lib/units/websocket/middleware/auth.js deleted file mode 100644 index 5c94577200..0000000000 --- a/lib/units/websocket/middleware/auth.js +++ /dev/null @@ -1,34 +0,0 @@ -import dbapi from '../../../db/api.js' -import * as jwtutil from '../../../util/jwtutil.js' -import logger from '../../../util/logger.js' - - -export default (function(options) { - const log = logger.createLogger('websocket') - return function(socket, next) { - try { - let req = socket.request - const tokenRaw = socket.handshake.auth.token - const token = jwtutil.decode(tokenRaw, options.secret) - req.internalJwt = tokenRaw - return !token?.email ? next(new Error('Invalid user')) : dbapi.loadUser(token.email) - .then(function(user) { - if (user) { - req.user = user - return next() - } - else { - return next(new Error('Invalid user')) - } - }) - .catch((e) => { - log.error(e) - return next(new Error('Unknown error')) - }) - } - catch (e) { - log.error(e) - return next(new Error('Missing authorization token')) - } - } -}) diff --git a/lib/units/websocket/middleware/auth.ts b/lib/units/websocket/middleware/auth.ts new file mode 100644 index 0000000000..84df6f1b81 --- /dev/null +++ b/lib/units/websocket/middleware/auth.ts @@ -0,0 +1,43 @@ +import dbapi from '../../../db/api.js' +import * as jwtutil from '../../../util/jwtutil.js' +import logger from '../../../util/logger.js' +import type {Socket} from 'socket.io' + +interface AuthOptions { + secret: string +} + +export default (options: AuthOptions) => { + const log = logger.createLogger('websocket') + return async (socket: Socket, next: (err?: Error) => void) => { + try { + const req = socket.request as any + const tokenRaw = socket.handshake.auth.token + const token = jwtutil.decode(tokenRaw, options.secret) + req.internalJwt = tokenRaw + + if (!token?.email) { + return next(new Error('Invalid user')) + } + + try { + const user = await dbapi.loadUser(token.email) + if (user) { + req.user = user + return next() + } + else { + return next(new Error('Invalid user')) + } + } + catch (e: any) { + log.error('WS Auth error: %s', e?.message ?? 'Unknown error') + return next(new Error('Unknown error')) + } + } + catch (e: any) { + log.error('WS Auth error: %s', e?.message ?? 'Unknown error') + return next(new Error('Missing authorization token')) + } + } +} diff --git a/lib/units/websocket/middleware/cookie-session.js b/lib/units/websocket/middleware/cookie-session.js deleted file mode 100644 index 9deea3573f..0000000000 --- a/lib/units/websocket/middleware/cookie-session.js +++ /dev/null @@ -1,9 +0,0 @@ -import cookieSession from 'cookie-session' -export default (function(options) { - var session = cookieSession(options) - return function(socket, next) { - var req = socket.request - var res = Object.create(null) - session(req, res, next) - } -}) diff --git a/lib/units/websocket/middleware/cookie-session.ts b/lib/units/websocket/middleware/cookie-session.ts new file mode 100644 index 0000000000..dfb64e1aaf --- /dev/null +++ b/lib/units/websocket/middleware/cookie-session.ts @@ -0,0 +1,16 @@ +import cookieSession from 'cookie-session' +import type {Socket} from 'socket.io' + +interface CookieSessionOptions { + name: string + keys: string[] +} + +export default (options: CookieSessionOptions) => { + const session = cookieSession(options) + return (socket: Socket, next: (err?: Error) => void) => { + const req = socket.request + const res = Object.create(null) + session(req as any, res, next as any) + } +} diff --git a/lib/units/websocket/middleware/remote-ip.js b/lib/units/websocket/middleware/remote-ip.js deleted file mode 100644 index fd67e19b03..0000000000 --- a/lib/units/websocket/middleware/remote-ip.js +++ /dev/null @@ -1,8 +0,0 @@ -import proxyaddr from 'proxy-addr' -export default (function(options) { - return function(socket, next) { - var req = socket.request - req.ip = proxyaddr(req, options.trust) - next() - } -}) diff --git a/lib/units/websocket/middleware/remote-ip.ts b/lib/units/websocket/middleware/remote-ip.ts new file mode 100644 index 0000000000..0cf3382db2 --- /dev/null +++ b/lib/units/websocket/middleware/remote-ip.ts @@ -0,0 +1,17 @@ +import type {Socket} from 'socket.io' + +// TODO: switch dep +// @ts-ignore +import proxyaddr from 'proxy-addr' + +interface RemoteIpOptions { + trust: proxyaddr.TrustFunction +} + +export default (options: RemoteIpOptions) => { + return (socket: Socket, next: (err?: Error) => void) => { + const req = socket.request as any + req.ip = proxyaddr(req, options.trust) + next() + } +} diff --git a/lib/util/zmqutil.js b/lib/util/zmqutil.ts similarity index 51% rename from lib/util/zmqutil.js rename to lib/util/zmqutil.ts index 3a4d67b438..9403c7f603 100644 --- a/lib/util/zmqutil.js +++ b/lib/util/zmqutil.ts @@ -3,50 +3,72 @@ // // This wrapper is designed to make 0MQ v6 backwards compatible with v5 -import * as zmq from 'zeromq' +import { + Publisher, + Subscriber, + Push, + Pull, + Dealer, + Router, + Pair, + Request, + Reply, + Context +} from 'zeromq' import logger from './logger.js' import {EventEmitter} from 'events' const log = logger.createLogger('util:zmqutil') const socketTypeMap = { - pub: zmq.Publisher, - sub: zmq.Subscriber, - push: zmq.Push, - pull: zmq.Pull, - dealer: zmq.Dealer, - router: zmq.Router, - pair: zmq.Pair, - req: zmq.Request, - reply: zmq.Reply -} + pub: Publisher, + sub: Subscriber, + push: Push, + pull: Pull, + dealer: Dealer, + router: Router, + pair: Pair, + req: Request, + reply: Reply +} as const + +const sendHwmByType = { + pub: 5000, + push: 1000, + dealer: 5000, + router: 5000, + req: 1000, + pair: 1000 +} as const satisfies Partial> + +type SocketType = keyof typeof socketTypeMap -// Shared ZMQ context to avoid creating multiple contexts with thread pools -// Each context creates ioThreads (4 by default), so sharing saves resources -/** @type {zmq.Context | null} */ -let sharedContext = null -const getSharedContext = () => { - if (!sharedContext) { - sharedContext = new zmq.Context({ - blocky: true, - ioThreads: 4, - ipv6: true, - maxSockets: 8192, - }) +export class SocketWrapper extends EventEmitter { + static sharedContext: Context + + // Shared ZMQ context to avoid creating multiple contexts with thread pools + // Each context creates ioThreads (4 by default), so sharing saves resources + static getSharedContext = () => { + if (!SocketWrapper.sharedContext) { + SocketWrapper.sharedContext = new Context({ + blocky: true, + ioThreads: 4, + ipv6: true, + maxSockets: 8192, + }) + } + + return SocketWrapper.sharedContext } - return sharedContext -} -export class SocketWrapper extends EventEmitter { - #sendQueue = Promise.resolve() + private sendQueue = Promise.resolve() + private iterator: AsyncIterator | null = null - /** @type {AsyncIterator | null} */ - #iterator = null + public type: string + public isActive = true + public endpoints = new Set() + public socket: InstanceType - /** - * @param {string} type - * @param {number} keepAliveInterval - */ - constructor(type, keepAliveInterval = 30) { + constructor(type: SocketType, keepAliveInterval = 30) { super() if (!(type in socketTypeMap)) { @@ -54,22 +76,26 @@ export class SocketWrapper extends EventEmitter { } this.type = type - this.isActive = true - this.endpoints = new Set() - // @ts-ignore const SocketClass = socketTypeMap[type] this.socket = new SocketClass({ + linger: 2000, tcpKeepalive: 1, tcpKeepaliveIdle: keepAliveInterval, tcpKeepaliveInterval: keepAliveInterval, - tcpKeepaliveCount: 100 - }, getSharedContext()) + tcpKeepaliveCount: 100, + context: SocketWrapper.getSharedContext(), + ...( + type in sendHwmByType && { + sendHighWaterMark: sendHwmByType[type as keyof typeof sendHwmByType] + } + ) + }) } - bindSync = (address) => this.socket.bindSync(address) + bindSync = (address: string) => this.socket.bindSync(address) - connect(endpoint) { + connect(endpoint: string) { this.socket.connect(endpoint) this.endpoints.add(endpoint) log.verbose(`Socket connected to: ${endpoint}`) @@ -77,9 +103,9 @@ export class SocketWrapper extends EventEmitter { return this } - subscribe(topic) { + subscribe(topic: string | Buffer) { if (this.type === 'sub') { - this.socket.subscribe( + (this.socket as Subscriber).subscribe( typeof topic === 'string' ? Buffer.from(topic) : topic ) } @@ -87,33 +113,30 @@ export class SocketWrapper extends EventEmitter { return this } - unsubscribe(topic) { + unsubscribe(topic: string | Buffer) { if (this.type === 'sub') { - this.socket.unsubscribe( + (this.socket as Subscriber).unsubscribe( typeof topic === 'string' ? Buffer.from(topic) : topic ) } return this } - async sendAsync(args) { + async sendAsync(args: any | Array) { try { - await this.socket.send( + await (this.socket as Publisher).send( (Array.isArray(args) ? args : [args]) .map(arg => Buffer.isBuffer(arg) || ArrayBuffer.isView(arg) ? arg : Buffer.from(String(arg))) ) } - catch (/** @type {any} */ err) { + catch (err: any) { log.error('Error on send: %s', err?.message || err?.toString() || JSON.stringify(err)) throw err // Re-throw to properly handle in the promise chain } } - /** - * @param {any} args - */ - send(args) { - this.#sendQueue = this.#sendQueue.then(() => this.sendAsync(args)) + send(args: any | Array) { + this.sendQueue = this.sendQueue.then(() => this.sendAsync(args)) return this } @@ -121,19 +144,19 @@ export class SocketWrapper extends EventEmitter { this.isActive = false // Close async iterator if it exists - if (this.#iterator && typeof this.#iterator.return === 'function') { + if (this.iterator && typeof this.iterator.return === 'function') { try { - await this.#iterator.return() + await this.iterator.return() } catch { // Ignore errors during cleanup } - this.#iterator = null + this.iterator = null } // Wait for send queue to drain before closing socket try { - await this.#sendQueue.catch(() => {}) + await this.sendQueue.catch(() => {}) } catch { // Ignore errors during cleanup @@ -145,27 +168,24 @@ export class SocketWrapper extends EventEmitter { return this } - /** - * - * @returns {Promise} - */ - async startReceiveLoop() { - const isValidType = - this.type === 'sub' || - this.type === 'pull' || - this.type === 'dealer' || - this.type === 'router' || - this.type === 'reply' + async startReceiveLoop(): Promise { + const isValidType = [ + 'sub', + 'pull', + 'dealer', + 'router', + 'reply' + ].includes(this.type) if (!this.isActive || !isValidType) { return } try { - this.#iterator = this.socket[Symbol.asyncIterator]() + this.iterator = (this.socket as Subscriber)[Symbol.asyncIterator]() as typeof this.iterator let result - while (this.isActive && this.#iterator && !(result = await this.#iterator.next()).done) { + while (this.isActive && this.iterator && !(result = await this.iterator.next()).done) { const message = result.value if (Array.isArray(message) && !!message[0]?.toString) { @@ -177,14 +197,14 @@ export class SocketWrapper extends EventEmitter { } } } - catch (/** @type {any} */ err) { + catch (err: any) { log.error('Error in message receive loop: %s, %s', err?.message || err?.toString() || err, err.stack) return this.startReceiveLoop() } } } -export const socket = (type) => { +export const socket = (type: SocketType) => { if (!(type in socketTypeMap)) { throw new Error(`Unsupported socket type: ${type}`) } diff --git a/lib/wire/wire.proto b/lib/wire/wire.proto index c213f71719..6a1c0dace4 100644 --- a/lib/wire/wire.proto +++ b/lib/wire/wire.proto @@ -299,10 +299,12 @@ message DeviceRegisteredMessage { message DevicePresentMessage { required string serial = 1; + optional double presenceChangedAt = 2; } message DeviceAbsentMessage { required string serial = 1; + optional double presenceChangedAt = 2; } message DeviceReadyMessage { diff --git a/lib/wire/wire.ts b/lib/wire/wire.ts index 563e3f9ac2..f944c06afe 100644 --- a/lib/wire/wire.ts +++ b/lib/wire/wire.ts @@ -853,6 +853,10 @@ export interface DevicePresentMessage { * @generated from protobuf field: required string serial = 1 */ serial: string; + /** + * @generated from protobuf field: optional double presenceChangedAt = 2 + */ + presenceChangedAt?: number; } /** * @generated from protobuf message DeviceAbsentMessage @@ -862,6 +866,10 @@ export interface DeviceAbsentMessage { * @generated from protobuf field: required string serial = 1 */ serial: string; + /** + * @generated from protobuf field: optional double presenceChangedAt = 2 + */ + presenceChangedAt?: number; } /** * @generated from protobuf message DeviceReadyMessage @@ -5374,7 +5382,8 @@ export const DeviceRegisteredMessage = new DeviceRegisteredMessage$Type(); class DevicePresentMessage$Type extends MessageType { constructor() { super("DevicePresentMessage", [ - { no: 1, name: "serial", kind: "scalar", T: 9 /*ScalarType.STRING*/ } + { no: 1, name: "serial", kind: "scalar", T: 9 /*ScalarType.STRING*/ }, + { no: 2, name: "presenceChangedAt", kind: "scalar", opt: true, T: 1 /*ScalarType.DOUBLE*/ } ]); } create(value?: PartialMessage): DevicePresentMessage { @@ -5392,6 +5401,9 @@ class DevicePresentMessage$Type extends MessageType { case /* required string serial */ 1: message.serial = reader.string(); break; + case /* optional double presenceChangedAt */ 2: + message.presenceChangedAt = reader.double(); + break; default: let u = options.readUnknownField; if (u === "throw") @@ -5407,6 +5419,9 @@ class DevicePresentMessage$Type extends MessageType { /* required string serial = 1; */ if (message.serial !== "") writer.tag(1, WireType.LengthDelimited).string(message.serial); + /* optional double presenceChangedAt = 2; */ + if (message.presenceChangedAt !== undefined) + writer.tag(2, WireType.Bit64).double(message.presenceChangedAt); let u = options.writeUnknownFields; if (u !== false) (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); @@ -5421,7 +5436,8 @@ export const DevicePresentMessage = new DevicePresentMessage$Type(); class DeviceAbsentMessage$Type extends MessageType { constructor() { super("DeviceAbsentMessage", [ - { no: 1, name: "serial", kind: "scalar", T: 9 /*ScalarType.STRING*/ } + { no: 1, name: "serial", kind: "scalar", T: 9 /*ScalarType.STRING*/ }, + { no: 2, name: "presenceChangedAt", kind: "scalar", opt: true, T: 1 /*ScalarType.DOUBLE*/ } ]); } create(value?: PartialMessage): DeviceAbsentMessage { @@ -5439,6 +5455,9 @@ class DeviceAbsentMessage$Type extends MessageType { case /* required string serial */ 1: message.serial = reader.string(); break; + case /* optional double presenceChangedAt */ 2: + message.presenceChangedAt = reader.double(); + break; default: let u = options.readUnknownField; if (u === "throw") @@ -5454,6 +5473,9 @@ class DeviceAbsentMessage$Type extends MessageType { /* required string serial = 1; */ if (message.serial !== "") writer.tag(1, WireType.LengthDelimited).string(message.serial); + /* optional double presenceChangedAt = 2; */ + if (message.presenceChangedAt !== undefined) + writer.tag(2, WireType.Bit64).double(message.presenceChangedAt); let u = options.writeUnknownFields; if (u !== false) (u == true ? UnknownFieldHandler.onWrite : u)(this.typeName, message, writer); diff --git a/ui/package-lock.json b/ui/package-lock.json index f1885b095d..cc12bad1bf 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -1,12 +1,12 @@ { "name": "vk-devicehub-ui", - "version": "1.5.0", + "version": "1.5.2", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "vk-devicehub-ui", - "version": "1.5.0", + "version": "1.5.2", "hasInstallScript": true, "dependencies": { "@lukemorales/query-key-factory": "^1.3.4", @@ -21,6 +21,7 @@ "classnames": "^2.5.1", "console-feed": "^3.8.0", "date-fns": "^4.1.0", + "exponential-backoff": "^3.1.3", "i18next": "^23.16.0", "i18next-browser-languagedetector": "^8.0.0", "i18next-http-backend": "^2.6.2", @@ -9236,6 +9237,12 @@ "node": ">=12.0.0" } }, + "node_modules/exponential-backoff": { + "version": "3.1.3", + "resolved": "https://registry.npmjs.org/exponential-backoff/-/exponential-backoff-3.1.3.tgz", + "integrity": "sha512-ZgEeZXj30q+I0EN+CbSSpIyPaJ5HVQD18Z1m+u1FXbAeT94mr1zw50q4q6jiiC447Nl/YTcIYSAftiGqetwXCA==", + "license": "Apache-2.0" + }, "node_modules/extend": { "version": "3.0.2", "resolved": "https://registry.npmjs.org/extend/-/extend-3.0.2.tgz", diff --git a/ui/package.json b/ui/package.json index e406976d84..6429f664bf 100644 --- a/ui/package.json +++ b/ui/package.json @@ -41,6 +41,7 @@ "classnames": "^2.5.1", "console-feed": "^3.8.0", "date-fns": "^4.1.0", + "exponential-backoff": "^3.1.3", "i18next": "^23.16.0", "i18next-browser-languagedetector": "^8.0.0", "i18next-http-backend": "^2.6.2", diff --git a/ui/src/api/socket.ts b/ui/src/api/socket.ts index 090d5f0a23..20664c1ba7 100644 --- a/ui/src/api/socket.ts +++ b/ui/src/api/socket.ts @@ -1,14 +1,65 @@ import { io } from 'socket.io-client' +import { backOff } from 'exponential-backoff' import { variablesConfig } from '@/config/variables.config' import { authStore } from '@/store/auth-store' export const socket = io(variablesConfig[import.meta.env.MODE].websocketUrl, { autoConnect: false, - reconnectionAttempts: 3, - reconnection: true, + reconnection: false, transports: ['websocket'], auth: (cb) => { cb({ token: authStore.jwt }) }, }) + +let reconnecting: Promise | null = null + +function connectSocket(): Promise { + return new Promise((resolve, reject) => { + if (socket.connected) { + resolve() + + return + } + + const onConnect = () => { + socket.off('connect_error', onError) + resolve() + } + + const onError = (err: Error) => { + socket.off('connect', onConnect) + reject(err) + } + + socket.once('connect', onConnect) + socket.once('connect_error', onError) + socket.connect() + }) +} + +export function connectWithBackoff(): Promise { + if (reconnecting) return reconnecting + + reconnecting = backOff(() => connectSocket(), { + numOfAttempts: 10, + startingDelay: 1000, + maxDelay: 30000, + jitter: 'full', + }) + .catch((err) => { + console.error('Socket.IO: all reconnection attempts exhausted', err) + }) + .finally(() => { + reconnecting = null + }) + + return reconnecting +} + +socket.on('disconnect', (reason) => { + if (reason === 'io client disconnect') return + + connectWithBackoff() +}) diff --git a/ui/src/components/app/app-router/require-auth.tsx b/ui/src/components/app/app-router/require-auth.tsx index b68e65942a..a9d0664155 100644 --- a/ui/src/components/app/app-router/require-auth.tsx +++ b/ui/src/components/app/app-router/require-auth.tsx @@ -4,7 +4,7 @@ import { useEffect } from 'react' import { ConditionalRender } from '@/components/lib/conditional-render' -import { socket } from '@/api/socket' +import { connectWithBackoff } from '@/api/socket' import { authStore } from '@/store/auth-store' import { variablesConfig } from '@/config/variables.config' @@ -30,7 +30,7 @@ export const RequireAuth = observer(() => { useEffect(() => { if (authStore.isHydrated && authStore.isAuthed) { - socket.connect() + connectWithBackoff() } }, [authStore.isHydrated, authStore.isAuthed]) diff --git a/ui/src/store/device-screen-store/device-screen-store.ts b/ui/src/store/device-screen-store/device-screen-store.ts index 1bd3337c37..822dd0f7ef 100644 --- a/ui/src/store/device-screen-store/device-screen-store.ts +++ b/ui/src/store/device-screen-store/device-screen-store.ts @@ -1,6 +1,7 @@ import { t } from 'i18next' import { makeAutoObservable, runInAction } from 'mobx' import { inject, injectable } from 'inversify' +import { backOff } from 'exponential-backoff' import { CONTAINER_IDS } from '@/config/inversify/container-ids' import { DeviceBySerialStore } from '@/store/device-by-serial-store' @@ -14,12 +15,8 @@ import type { Device } from '@/generated/types' @injectable() @deviceConnectionRequired() export class DeviceScreenStore { - private readonly websocketReconnectionInterval = 5000 // NOTE: 5s - private readonly websocketReconnectionMaxAttempts = 3 // NOTE: 5s * 3 -> 15s total delay private websocket: WebSocket | null = null - private websocketReconnecting = false - private websocketReconnectionAttempt = 0 - private websocketReconnectionTimeoutID: ReturnType | null = null + private backoffPromise: Promise | null = null private disposed = false private context: ImageBitmapRenderingContext | null = null @@ -45,7 +42,6 @@ export class DeviceScreenStore { constructor(@inject(CONTAINER_IDS.deviceBySerialStore) private deviceBySerialStore: DeviceBySerialStore) { this.updateBounds = this.updateBounds.bind(this) this.messageListener = this.messageListener.bind(this) - this.openListener = this.openListener.bind(this) makeAutoObservable(this) } @@ -75,7 +71,6 @@ export class DeviceScreenStore { this.setIsScreenLoading(true) }) - // NOTE: Prevents ws connection if stopScreenStreaming was called earlier if (this.disposed) { this.disposed = false @@ -85,16 +80,17 @@ export class DeviceScreenStore { this.context = canvas.getContext('bitmaprenderer') this.canvasWrapper = canvasWrapper - this.connectWebsocket() + this.connectWithBackoff() } stopScreenStreaming(): void { this.disposed = true - this.stopWebsocket() + this.backoffPromise = null - if (this.websocketReconnectionTimeoutID) { - clearTimeout(this.websocketReconnectionTimeoutID) - this.websocketReconnectionTimeoutID = null + if (this.websocket) { + this.websocket.onclose = null + this.websocket.close() + this.websocket = null } } @@ -129,14 +125,10 @@ export class DeviceScreenStore { private shouldUpdateScreen(): boolean { return Boolean( - // NO if the user has disabled the screen. this.showScreen && - // NO if the page is not visible (e.g. background tab). document.visibilityState === 'visible' && - // NO if we don't have a connection yet. this.websocket && this.websocket.readyState === WebSocket.OPEN - // YES otherwise ) } @@ -231,49 +223,97 @@ export class DeviceScreenStore { this.determineAspectRatioMode() } - private connectWebsocket(): void { - if (!this.device?.display?.url) { - throw new Error('No display url') - } + private connectWebsocket(): Promise { + return new Promise((resolve, reject) => { + if (!this.device?.display?.url) { + reject(new Error('No display url')) - if (!authStore.jwt) { - console.warn('No JWT token available in authStore') - throw new Error('Authentication token required') - } + return + } - // Pass JWT token securely via WebSocket subprotocol - this.websocket = new WebSocket(this.device.display.url, `access_token.${authStore.jwt}`) + if (!authStore.jwt) { + reject(new Error('Authentication token required')) - this.websocket.binaryType = 'blob' - this.websocket.onopen = this.openListener.bind(this) - this.websocket.onmessage = this.messageListener.bind(this) - this.websocket.onerror = this.errorListener.bind(this) - this.websocket.onclose = this.closeListener.bind(this) - } + return + } - private stopWebsocket(): void { - if (this.websocket) { - this.websocket.close() - this.websocket = null - } + const ws = new WebSocket(this.device.display.url, `access_token.${authStore.jwt}`) + + ws.binaryType = 'blob' + + ws.onopen = () => { + this.websocket = ws + ws.onmessage = this.messageListener.bind(this) + ws.onerror = () => {} + ws.onclose = this.handleUnexpectedClose.bind(this) + + this.isScreenStreamingJustStarted = true + resolve() + } + + ws.onerror = () => {} + + ws.onclose = (event: CloseEvent) => { + if (event.code === 1008) { + reject(new AuthError()) + + return + } + + reject(new Error(`WebSocket closed before opening: ${event.code}`)) + } + }) } - private reconnectWebsocket(): void { - // NOTE: No need reconnect if it is already in progress - if (this.websocketReconnecting || this.websocketReconnectionTimeoutID) return + private connectWithBackoff(): Promise { + if (this.backoffPromise) return this.backoffPromise + + this.backoffPromise = backOff(() => this.connectWebsocket(), { + numOfAttempts: 8, + startingDelay: 1000, + maxDelay: 16000, + jitter: 'full', + retry: (err) => { + if (this.disposed) return false + if (err instanceof AuthError) return false - this.websocketReconnecting = true - this.websocketReconnectionAttempt += 1 - this.connectWebsocket() + return true + }, + }) + .catch((err) => { + if (this.disposed) return + + runInAction(() => { + if (err instanceof AuthError) { + deviceErrorModalStore.setError(t('Unauthorized')) + } else { + deviceErrorModalStore.setError(t('Service is currently unavailable')) + } + }) + }) + .finally(() => { + this.backoffPromise = null + }) + + return this.backoffPromise } - private openListener(): void { - if (this.websocketReconnecting) { - this.websocketReconnecting = false - this.websocketReconnectionAttempt = 0 + private handleUnexpectedClose(event: CloseEvent): void { + this.websocket = null + + runInAction(() => { + this.setIsScreenLoading(true) + }) + + if (event.code === 1008) { + deviceErrorModalStore.setError(t('Unauthorized')) + + return } - this.isScreenStreamingJustStarted = true + if (!event.wasClean && !this.disposed) { + this.connectWithBackoff() + } } private messageListener(message: MessageEvent): void { @@ -297,12 +337,9 @@ export class DeviceScreenStore { } if (message.data === 'secure_on') { - // NOTE: The current view is marked secure and cannot be viewed remotely - return } - // Handle authentication messages if (typeof message.data === 'string') { try { const authMessage = JSON.parse(message.data) @@ -342,30 +379,11 @@ export class DeviceScreenStore { this.screenRotation = startData.orientation } } +} - private errorListener(): void {} - - private closeListener(event: CloseEvent): void { - this.setIsScreenLoading(true) - this.websocketReconnecting = false - - if (event.code === 1008) { - deviceErrorModalStore.setError(t('Unauthorized')) - - return - } - - if (!event.wasClean && this.websocketReconnectionAttempt < this.websocketReconnectionMaxAttempts) { - this.websocketReconnectionTimeoutID = setTimeout(() => { - this.websocketReconnectionTimeoutID = null - this.reconnectWebsocket() - }, this.websocketReconnectionInterval) - - return - } - - if (this.websocketReconnectionAttempt >= this.websocketReconnectionMaxAttempts) { - deviceErrorModalStore.setError(t('Service is currently unavailable')) - } +class AuthError extends Error { + constructor() { + super('Unauthorized') + this.name = 'AuthError' } }