Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose-macos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ services:
container_name: devicehub-api-groups-engine
env_file:
- scripts/variables.env
command: stf groups-engine --connect-push tcp://devicehub-triproxy-app:7170 --connect-push-dev tcp://devicehub-triproxy-dev:7270
command: stf groups-engine --connect-push tcp://devicehub-triproxy-app:7170 --connect-push-dev tcp://devicehub-triproxy-dev:7270 --connect-sub tcp://devicehub-triproxy-app:7150
depends_on:
devicehub-migrate:
condition: service_completed_successfully
Expand Down
2 changes: 1 addition & 1 deletion docker-compose-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ services:
container_name: devicehub-api-groups-engine
env_file:
- scripts/variables.env
command: stf groups-engine --connect-push tcp://devicehub-triproxy-app:7170 --connect-push-dev tcp://devicehub-triproxy-dev:7270
command: stf groups-engine --connect-push tcp://devicehub-triproxy-app:7170 --connect-push-dev tcp://devicehub-triproxy-dev:7270 --connect-sub tcp://devicehub-triproxy-app:7150
depends_on:
devicehub-migrate:
condition: service_completed_successfully
Expand Down
7 changes: 7 additions & 0 deletions lib/cli/groups-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ export const builder = function(yargs) {
array: true,
demand: true
})
.option('connect-sub', {
alias: 's',
describe: 'App-side ZeroMQ PUB endpoint to subscribe to.',
array: true,
demand: true
})
.epilog('Each option can be be overwritten with an environment variable ' +
'by converting the option to uppercase, replacing dashes with ' +
'underscores and prefixing it with `STF_GROUPS_ENGINE_` .)')
Expand All @@ -26,6 +32,7 @@ export const handler = function(argv) {
endpoints: {
push: argv.connectPush,
pushdev: argv.connectPushDev,
sub: argv.connectSub,
}
})
}
1 change: 1 addition & 0 deletions lib/cli/local/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ export const handler = function(argv) {
'groups-engine',
'--connect-push', argv.bindAppPull,
'--connect-push-dev', argv.bindDevPull,
'--connect-sub', argv.bindAppPub,
],
[ // websocket
'websocket',
Expand Down
22 changes: 16 additions & 6 deletions lib/db/models/all/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -605,29 +605,39 @@ export const unsetDeviceOwner = function(serial) {
}

// dbapi.setDevicePresent = function(serial) {
export const setDevicePresent = function(serial) {
export const setDevicePresent = function(serial, presenceChangedAt) {
const timestamp = presenceChangedAt ? new Date(presenceChangedAt) : getNow()
const filter = presenceChangedAt ?
{serial, $or: [{presenceChangedAt: {$lte: timestamp}}, {presenceChangedAt: {$exists: false}}]} :
{serial}

return db.devices.updateOne(
{serial: serial},
filter,
{
$set: {
present: true,
presenceChangedAt: getNow()
presenceChangedAt: timestamp
}
}
)
}

// dbapi.setDeviceAbsent = function(serial) {
export const setDeviceAbsent = function(serial) {
export const setDeviceAbsent = function(serial, presenceChangedAt) {
const timestamp = presenceChangedAt ? new Date(presenceChangedAt) : getNow()
const filter = presenceChangedAt ?
{serial, $or: [{presenceChangedAt: {$lte: timestamp}}, {presenceChangedAt: {$exists: false}}]} :
{serial}

return db.devices.updateOne(
{serial: serial},
filter,
{
$set: {
owner: null,
usage: null,
logs_enabled: false,
present: false,
presenceChangedAt: getNow()
presenceChangedAt: timestamp
}
}
)
Expand Down
13 changes: 9 additions & 4 deletions lib/units/groups-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@ export default (async function(options) {
const {
push
, pushdev
, sub
, channelRouter
} = await db.createZMQSockets(options.endpoints, log)
await db.connect()

devicesWatcher(push, pushdev, channelRouter)
devicesWatcher(push, pushdev, channelRouter, sub)

lifecycle.observe(() =>
[push, pushdev].forEach((sock) => sock.close())
)
lifecycle.observe(() => {
push.close()
pushdev.close()
if (sub) {
sub.close()
}
})
log.info('Groups engine started')
})
54 changes: 20 additions & 34 deletions lib/units/groups-engine/watchers/devices.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import {WireRouter} from '../../../wire/router.js'
import _ from 'lodash'
import util from 'util'
import {v4 as uuidv4} from 'uuid'
Expand All @@ -8,20 +7,10 @@ import wireutil from '../../../wire/util.js'
import wire from '../../../wire/index.js'
import dbapi from '../../../db/api.js'
import db from '../../../db/index.js'
import {LeaveGroupMessage} from '../../../wire/wire.js'
export default (function(push, pushdev, channelRouter) {
import {UngroupMessage} from '../../../wire/wire.js'
import {runTransaction} from '../../../wire/transmanager.js'
export default (function(push, pushdev, channelRouter, sub) {
const log = logger.createLogger('watcher-devices')
function sendReleaseDeviceControl(serial, channel) {
push.send([
channel,
wireutil.envelope(new wire.UngroupMessage(wireutil.toDeviceRequirements({
serial: {
value: serial,
match: 'exact'
}
})))
])
}
function sendDeviceGroupChange(id, group, serial, originName) {
pushdev.send([
wireutil.global,
Expand All @@ -42,24 +31,21 @@ export default (function(push, pushdev, channelRouter) {
wireutil.envelope(new wire.DeviceChangeMessage(publishDevice(), action, device2.group.origin, timeutil.now('nano')))
])
}
function sendReleaseDeviceControlAndDeviceGroupChange(device, sendDeviceGroupChangeWrapper) {
let messageListener
const responseTimer = setTimeout(function() {
channelRouter.removeListener(wireutil.global, messageListener)
sendDeviceGroupChangeWrapper()
}, 5000)
messageListener = new WireRouter()
.on(LeaveGroupMessage, function(channel, message) {
if (message.serial === device.serial &&
message.owner.email === device.owner.email) {
clearTimeout(responseTimer)
channelRouter.removeListener(wireutil.global, messageListener)
sendDeviceGroupChangeWrapper()
}
})
.handler()
channelRouter.on(wireutil.global, messageListener)
sendReleaseDeviceControl(device.serial, device.channel)
async function sendReleaseDeviceControlAndDeviceGroupChange(device, sendDeviceGroupChangeWrapper) {
try {
await runTransaction(device.channel, UngroupMessage, {
requirements: wireutil.toDeviceRequirements({
serial: {
value: device.serial,
match: 'exact'
}
})
}, {sub, push, channelRouter, timeout: 10000})
}
catch (/** @type {any} */ err) {
log.warn('Ungroup transaction for %s failed: %s', device.serial, err?.message)
}
sendDeviceGroupChangeWrapper()
}
let changeStream
db.connect().then(client => {
Expand Down Expand Up @@ -169,7 +155,7 @@ export default (function(push, pushdev, channelRouter) {
if (group) {
if (isDeleted) {
if (oldDoc.owner) {
sendReleaseDeviceControlAndDeviceGroupChange(oldDoc, sendDeviceGroupChangeOnDeviceDeletion)
await sendReleaseDeviceControlAndDeviceGroupChange(oldDoc, sendDeviceGroupChangeOnDeviceDeletion)
return
}
sendDeviceGroupChangeOnDeviceDeletion()
Expand All @@ -185,7 +171,7 @@ export default (function(push, pushdev, channelRouter) {
}
if (isChangeCurrentGroup) {
if (newDoc.owner && group.users.indexOf(newDoc.owner.email) < 0) {
sendReleaseDeviceControlAndDeviceGroupChange(newDoc, sendDeviceGroupChangeOnDeviceCurrentGroupUpdating)
await sendReleaseDeviceControlAndDeviceGroupChange(newDoc, sendDeviceGroupChangeOnDeviceCurrentGroupUpdating)
}
else {
sendDeviceGroupChangeOnDeviceCurrentGroupUpdating()
Expand Down
4 changes: 2 additions & 2 deletions lib/units/processor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ export default db.ensureConnectivity(async(options: Options) => {
dbapi.initializeIosDeviceState(options.publicIp, message)
})
.on(DevicePresentMessage, async (channel, message, data) => {
await dbapi.setDevicePresent(message.serial)
await dbapi.setDevicePresent(message.serial, message.presenceChangedAt)
appDealer.send([channel, data])
})
.on(DeviceAbsentMessage, async (channel, message, data) => {
await dbapi.setDeviceAbsent(message.serial)
await dbapi.setDeviceAbsent(message.serial, message.presenceChangedAt)
appDealer.send([channel, data])
})
.on(DeviceStatusMessage, (channel, message, data) => {
Expand Down
4 changes: 2 additions & 2 deletions lib/units/reaper/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ export default (async(options: Options) => {
log.info('Device "%s" is present', serial)
push.send([
wireutil.global,
wireutil.pack(DevicePresentMessage, {serial})
wireutil.pack(DevicePresentMessage, {serial, presenceChangedAt: Date.now()})
])
})

ttlset.on('drop', (serial) => {
log.info('Reaping device "%s" due to heartbeat timeout', serial)
push.send([
wireutil.global,
wireutil.pack(DeviceAbsentMessage, {serial})
wireutil.pack(DeviceAbsentMessage, {serial, presenceChangedAt: Date.now()})
])
})

Expand Down
Loading