Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ OSEM_API_URL="https://api.opensensemap.org/"
DIRECTUS_URL="https://coelho.opensensemap.org"
SENSORWIKI_API_URL="https://api.sensors.wiki/"

MQTT_SERVICE_URL="http://localhost:3001"
MQTT_SERVICE_KEY="dev-service-key-change-in-production"

MYBADGES_API_URL = "https://api.v2.mybadges.org/"
MYBADGES_URL = "https://mybadges.org/"
MYBADGES_SERVERADMIN_USERNAME = ""
Expand Down
6 changes: 6 additions & 0 deletions app/lib/env.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import "dotenv/config"

export const env = {
MQTT_SERVICE_URL: process.env.MQTT_SERVICE_URL!,
MQTT_SERVICE_KEY: process.env.MQTT_SERVICE_KEY!,
}
104 changes: 104 additions & 0 deletions app/lib/mqttClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { env } from "./env"
import { setMqttIntegrationEnabled } from "~/models/integration.server"

interface MqttClientResponse {
success: boolean
deviceId: string
}

interface MqttStatusResponse {
deviceId: string
connected: boolean
}

interface MqttHealthResponse {
status: string
connections: number
timestamp: string
}

class MqttClient {
private baseUrl = env.MQTT_SERVICE_URL
private serviceKey = env.MQTT_SERVICE_KEY

private async request<T>(
endpoint: string,
options: RequestInit = {}
): Promise<T> {
const url = `${this.baseUrl}${endpoint}`

const response = await fetch(url, {
...options,
headers: {
'Content-Type': 'application/json',
'x-service-key': this.serviceKey,
...options.headers,
},
})

if (!response.ok) {
const error = await response.json().catch(() => ({
error: 'Unknown error'
}))
throw new Error(
`MQTT Service error: ${response.status} - ${error.error || error.message || 'Unknown error'}`
)
}

return response.json()
}

/**
* Connect a device to its MQTT broker
*/
async connectBox(params: { box_id: string }): Promise<MqttClientResponse> {
await setMqttIntegrationEnabled(params.box_id, true)
return this.request<MqttClientResponse>(
`/devices/${params.box_id}/connect`,
{ method: 'POST' }
)
}

/**
* Disconnect a device from its MQTT broker
*/
async disconnectBox(params: { box_id: string }): Promise<MqttClientResponse> {
await setMqttIntegrationEnabled(params.box_id, false)
return this.request<MqttClientResponse>(
`/devices/${params.box_id}/disconnect`,
{ method: 'POST' }
)
}

/**
* Reconnect a device (disconnect then connect with fresh config)
*/
async reconnectBox(params: { box_id: string }): Promise<MqttClientResponse> {
return this.request<MqttClientResponse>(
`/devices/${params.box_id}/reconnect`,
{ method: 'POST' }
)
}

/**
* Get connection status for a device
*/
async getStatus(deviceId: string): Promise<MqttStatusResponse> {
return this.request<MqttStatusResponse>(
`/devices/${deviceId}/status`,
{ method: 'GET' }
)
}

/**
* Get health status of the MQTT service
*/
async getHealth(): Promise<MqttHealthResponse> {
return this.request<MqttHealthResponse>(
'/health',
{ method: 'GET' }
)
}
}

export const mqttClient = new MqttClient()
56 changes: 56 additions & 0 deletions app/models/integration.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { eq } from 'drizzle-orm'
import { drizzleClient } from '~/db.server'
import { mqttIntegration, deviceToIntegrations } from '~/schema'

export async function setMqttIntegrationEnabled(
deviceId: string,
enabled: boolean,
) {
await drizzleClient
.update(mqttIntegration)
.set({ enabled })
.where(eq(mqttIntegration.deviceId, deviceId))
}

export async function getMqttIntegrationByDeviceId(deviceId: string) {
const [result] = await drizzleClient
.select({
deviceId: deviceToIntegrations.deviceId,
integrationId: mqttIntegration.id,
enabled: mqttIntegration.enabled,
url: mqttIntegration.url,
topic: mqttIntegration.topic,
messageFormat: mqttIntegration.messageFormat,
decodeOptions: mqttIntegration.decodeOptions,
connectionOptions: mqttIntegration.connectionOptions,
})
.from(deviceToIntegrations)
.innerJoin(
mqttIntegration,
eq(deviceToIntegrations.mqttIntegrationId, mqttIntegration.id),
)
.where(eq(deviceToIntegrations.deviceId, deviceId))
.limit(1)

return result
}

export async function getAllActiveMqttIntegrations() {
return await drizzleClient
.select({
deviceId: deviceToIntegrations.deviceId,
integrationId: mqttIntegration.id,
enabled: mqttIntegration.enabled,
url: mqttIntegration.url,
topic: mqttIntegration.topic,
messageFormat: mqttIntegration.messageFormat,
decodeOptions: mqttIntegration.decodeOptions,
connectionOptions: mqttIntegration.connectionOptions,
})
.from(deviceToIntegrations)
.innerJoin(
mqttIntegration,
eq(deviceToIntegrations.mqttIntegrationId, mqttIntegration.id),
)
.where(eq(mqttIntegration.enabled, true))
}
39 changes: 39 additions & 0 deletions app/routes/api.integrations.$deviceId.mqtt.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { type LoaderFunctionArgs } from "react-router"
import { getMqttIntegrationByDeviceId } from "~/models/integration.server"
import { StandardResponse } from "~/utils/response-utils"

export async function loader({ params, request }: LoaderFunctionArgs) {
try {
const deviceId = params.deviceId

if (!deviceId) {
return StandardResponse.badRequest("Missing deviceId")
}

const key = request.headers.get("x-service-key")

if (key != process.env.MQTT_SERVICE_KEY){
return StandardResponse.unauthorized("Key invalid, access denied.")
}

const integration = await getMqttIntegrationByDeviceId(deviceId)

if (!integration) {
return StandardResponse.notFound("MQTT integration not found")
}

return Response.json({
deviceId: integration.deviceId,
integrationId: integration.integrationId,
enabled: integration.enabled,
url: integration.url,
topic: integration.topic,
messageFormat: integration.messageFormat,
decodeOptions: integration.decodeOptions,
connectionOptions: integration.connectionOptions,
})
} catch (err) {
console.error("Error fetching MQTT integration:", err)
return StandardResponse.internalServerError()
}
}
31 changes: 31 additions & 0 deletions app/routes/api.integrations.mqtt.active.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { type LoaderFunction } from 'react-router'
import { getAllActiveMqttIntegrations } from '~/models/integration.server'
import { StandardResponse } from '~/utils/response-utils'

export const loader: LoaderFunction = async ({ request }) => {
try {
const key = request.headers.get("x-service-key")

if (key != process.env.MQTT_SERVICE_KEY){
return StandardResponse.unauthorized("Key invalid, access denied.")
}

const integrations = await getAllActiveMqttIntegrations()

const response = integrations.map((integration) => ({
deviceId: integration.deviceId,
integrationId: integration.integrationId,
enabled: integration.enabled,
url: integration.url,
topic: integration.topic,
messageFormat: integration.messageFormat,
decodeOptions: integration.decodeOptions,
connectionOptions: integration.connectionOptions,
}))

return Response.json(response)
} catch (err) {
console.error('Error fetching active MQTT integrations:', err)
return StandardResponse.internalServerError()
}
}
70 changes: 70 additions & 0 deletions app/routes/api.measurements.ingest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { type ActionFunctionArgs } from 'react-router'
import { z } from 'zod'
import { getDevice } from '~/models/device.server'
import { saveMeasurements } from '~/models/measurement.server'
import { StandardResponse } from '~/utils/response-utils'

const MeasurementSchema = z.object({
sensor_id: z.string(),
value: z.number(),
createdAt: z.string().datetime(),
location: z
.object({
lat: z.number(),
lng: z.number(),
altitude: z.number().optional(),
})
.optional(),
})

const BatchMeasurementSchema = z.object({
deviceId: z.string(),
measurements: z.array(MeasurementSchema),
})

export async function action({ request }: ActionFunctionArgs) {
try {
let body
try {
body = await request.json()
} catch (err) {
return StandardResponse.badRequest('Invalid JSON in request body')
}

const validationResult = BatchMeasurementSchema.safeParse(body)
if (!validationResult.success) {
return StandardResponse.badRequest(
validationResult.error.errors[0].message,
)
}

const { deviceId, measurements: rawMeasurements } = validationResult.data

const device = await getDevice({ id: deviceId })
if (!device) {
return StandardResponse.notFound('Device not found')
}

if (!device.sensors || device.sensors.length === 0) {
return StandardResponse.badRequest('Device has no sensors configured')
}

const measurements = rawMeasurements.map((m) => ({
sensor_id: m.sensor_id,
value: m.value,
createdAt: m.createdAt ? new Date(m.createdAt) : undefined,
location: m.location,
}))

try {
await saveMeasurements(device, measurements)
} catch (saveErr) {
// Still return 202
}

return new Response(null, { status: 202 })
} catch (err) {
if (err instanceof Response) throw err
return StandardResponse.internalServerError()
}
}
5 changes: 5 additions & 0 deletions app/schema/device.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { location } from './location'
import { logEntry } from './log-entry'
import { sensor } from './sensor'
import { user } from './user'
import { deviceToIntegrations } from './integration'

/**
* Table
Expand Down Expand Up @@ -84,6 +85,10 @@ export const deviceRelations = relations(device, ({ one, many }) => ({
sensors: many(sensor),
locations: many(deviceToLocation),
logEntries: many(logEntry),
integrations: one(deviceToIntegrations, {
fields: [device.id],
references: [deviceToIntegrations.deviceId],
}),
}))

// Many-to-many
Expand Down
Loading
Loading