Skip to content

corellium/NodeMQ

Repository files navigation

Sensor Subscription Service (NodeMQ)

Real-time sensor data streaming service with Server-Sent Events (SSE) and MQTT-style topic filtering.

Overview

NodeMQ is a Node.js service that receives sensor data from C device models (via HTTP POST), persists it, and streams it to browser clients via Server-Sent Events. It provides MQTT-style wildcard topic subscriptions, message buffering for replay, and comprehensive monitoring.

Features

  • Real-time Streaming - Server-Sent Events (SSE) for efficient server-to-client streaming
  • MQTT-Style Topics - Hierarchical topic structure with + and # wildcards
  • Message Buffering - Configurable per-topic ring buffer for instant replay to new subscribers
  • Parallel Processing - Multi-threaded message processing with worker threads for improved throughput
  • Log-Based Persistence - High-performance append-only log storage (100x faster than file-per-message)
  • Hot-Reload Config - Change buffer size and other settings without restart
  • Health Monitoring - /health and /metrics endpoints for observability
  • Sensor Discovery - /topics endpoint for discovering available sensors
  • FIFO Queues - Bounded ingestion and processing queues with overflow handling

Quick Start

Prerequisites

  • Node.js 18+ or 20+ (LTS recommended)
  • npm or yarn
  • For Corellium Generic Linux: SSH access and network connectivity

Local Development

# Install dependencies
npm install

# Build TypeScript
npm run build

# Start service
npm start

The service starts on port 3000 by default.

Complete Deployment Guide

This section covers the full deployment workflow from packaging to running the complete system with the CoreModel peripheral binary.

Complete Deployment Guide

This section covers the full deployment workflow from packaging to running the complete system with the CoreModel peripheral binary.

Step 1: Package the Service

The package.sh script creates a deployment tarball containing:

  • Compiled TypeScript service (dist/)
  • Node.js dependencies (package.json, package-lock.json)
  • Service configuration (config.json)
  • Documentation (README.md)
# From the NodeMQ project root
npm run build
./scripts/package.sh

This creates sensor-service-<version>.tar.gz (e.g., sensor-service-1.0.0.tar.gz).

Step 2: Deploy to Generic Linux

Update the Generic Linux Server

# Update the apt repository
sudo apt update

# Install the required packages from apt
sudo apt install make gcc build-essentials git npm nodejs

Transfer the Package

# Copy to your Corellium Generic Linux device
scp sensor-service-1.0.0.tar.gz root@<device-ip>:/tmp/

Extract and Install

# SSH into the device
ssh root@<device-ip>

# Extract the package
cd /tmp
tar -xzf sensor-service-1.0.0.tar.gz
cd sensor-service-1.0.0

# Install Node.js if not already present
apt-get update
curl -fsSL https://deb.nodesource.com/setup_20.x | bash -
apt-get install -y nodejs

# Install the service
# Option 1: Manual installation to /opt/sensor-service
sudo mkdir -p /opt/sensor-service
sudo cp -r dist package.json package-lock.json config.json /opt/sensor-service/
cd /opt/sensor-service
npm install --production

# Option 2: If you have an install.sh script (for systemd setup)
# ./install.sh

Create Systemd Service (Optional)

For automatic startup and management:

# Create systemd service file
sudo tee /etc/systemd/system/sensor-service.service > /dev/null <<EOF
[Unit]
Description=Sensor Subscription Service (NodeMQ)
After=network.target

[Service]
Type=simple
User=root
WorkingDirectory=/opt/sensor-service
ExecStart=/usr/bin/node dist/index.js
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target
EOF

# Enable and start the service
sudo systemctl daemon-reload
sudo systemctl enable sensor-service
sudo systemctl start sensor-service

# Check status
sudo systemctl status sensor-service

Step 3: Run the CoreModel Peripheral Binary

The coremodel-spi-max30123-file binary simulates a MAX30123 SPI peripheral and streams telemetry data to the sensor service.

What the Binary Does

  1. Connects to a Corellium VM as a virtual SPI device
  2. Loads sensor data from a CSV file generated by the Viser Python script
  3. Responds to SPI transactions from the VM (register reads/writes)
  4. Streams telemetry data to NodeMQ via HTTP POST to /ingest
  5. Sends two types of data:
    • SPI transactions (reads/writes to registers)
    • Sensor samples (13 types: PSTAT, CHRONO A/B, AP, voltages, etc.)

Prepare the Data Generator

The Viser Python script (Viser_max30123_Meas_Timing.py) generates realistic sensor data based on MAX30123 register configuration:

# Navigate to the docs directory of the repo and create a tarball
tar -czf docs/coremodel-ADI-Demo.tar.gz -C docs coremodel-ADI-Demo

# Copy the tarball to the /home/user of the Generic Linux server 
scp docs/coremodel-ADI-Demo.tar.gz user@<server ip address>:/home/user

# Untar the coremodel tarball
tar -xvf coremodel-ADI-Demo.tar.gz

#Navigate to the spi-adi directory and make the files
cd coremodel-ADI-Demo/examples/spi-adi
make

The Viser script:

  • Reads MAX30123 register configuration from Sequencer_InFile.csv
  • Calculates measurement timing based on register settings
  • Generates realistic sensor data for all 13 measurement types
  • Outputs max30123_FIFO_data.csv with timestamped samples

Start the CoreModel Binary

# Command format:
# coremodel-spi-max30123-file <data_generator> <vm_address:port> <bus_name> <chip_select> [telemetry_host]

# Example with telemetry to localhost:
./coremodel-spi-max30123-file Viser_max30123_Meas_Timing.py 10.11.1.3:1900 spi 0

# Example with custom telemetry host:
./coremodel-spi-max30123-file Viser_max30123_Meas_Timing.py 10.11.1.3:1900 spi 0 192.168.1.100:3000

# Disable telemetry:
./coremodel-spi-max30123-file Viser_max30123_Meas_Timing.py 10.11.1.3:1900 spi 0 NONE

Parameters:

  • data_generator: Python script that generates max30123_FIFO_data.csv
  • vm_address:port: Corellium VM IP and CoreModel port (typically 1900)
  • bus_name: SPI bus identifier (usually spi)
  • chip_select: SPI chip select line (usually 0)
  • telemetry_host: NodeMQ endpoint (defaults to localhost:3000/ingest)

Automated Startup Script

Use the provided startup script to wait for the sensor service and then launch the binary:

# Copy the startup script
cp /opt/sensor-service/docs/start-coremodel.sh /home/user/

# Make it executable
chmod +x /home/user/start-coremodel.sh

# The script:
# 1. Waits for sensor-service to be active
# 2. Waits for port 3000 to be listening
# 3. Launches the coremodel binary with telemetry enabled

# Run it
./start-coremodel.sh

Step 4: Message Flow from CoreModel to Dashboard

Data Flow Overview

Viser Python Script
    │ generates
    ▼
max30123_FIFO_data.csv (944 samples, 13 sensor types)
    │ loaded by
    ▼
coremodel-spi-max30123-file binary
    │ HTTP POST /ingest
    ▼
NodeMQ Sensor Service (localhost:3000)
    │ validates, persists, routes
    ▼
SSE Stream (Server-Sent Events)
    │ topic-based subscription
    ▼
Browser Dashboard (sensor-dashboard.html)

Topic Structure

Messages are routed using MQTT-style hierarchical topics:

sensors/{sourceModelId}/{deviceBus}/{sensorType}/{sensorId}[/{type}]

Examples from the MAX30123 peripheral:

sensors/max30123-spi/default/PSTAT/max30123-spi
sensors/max30123-spi/default/CHRONO A/max30123-spi
sensors/max30123-spi/default/CHRONO B/max30123-spi
sensors/max30123-spi/default/AP/max30123-spi
sensors/max30123-spi/default/CE1 Voltage/max30123-spi
sensors/max30123-spi/default/GPIO1 Voltage/max30123-spi
sensors/max30123-spi/default/SPI/max30123-spi/spiRead
sensors/max30123-spi/default/SPI/max30123-spi/spiWrite

Subscribing to Topics

Use MQTT-style wildcards to subscribe to multiple sensors:

Pattern Matches
sensors/# All sensors (everything)
sensors/max30123-spi/# All data from MAX30123 device
sensors/+/+/PSTAT/+ PSTAT measurements from any device
sensors/+/+/SPI/+/spiRead All SPI read transactions
sensors/max30123-spi/default/+/max30123-spi All measurement types from MAX30123

Wildcards:

  • + matches exactly one topic level
  • # matches zero or more levels (must be last segment)

Example: Subscribe via curl

# Subscribe to all sensors
curl -N "http://localhost:3000/subscribe?topic=sensors/#"

# Subscribe to PSTAT measurements only
curl -N "http://localhost:3000/subscribe?topic=sensors/+/+/PSTAT/+"

# Subscribe to SPI read transactions
curl -N "http://localhost:3000/subscribe?topic=sensors/+/+/SPI/+/spiRead"

The -N flag disables buffering for real-time streaming.

Example: Subscribe via Dashboard

Open docs/sensor-dashboard.html in a browser:

#Can be run with nxp
npx serve docs

# If running locally
open docs/sensor-dashboard.html

# Or serve it via HTTP
python3 -m http.server 8080
# Then open http://localhost:8080/docs/sensor-dashboard.html

The dashboard:

  1. Connects to http://localhost:3000/subscribe?topic=sensors/#
  2. Receives sensor data via Server-Sent Events (SSE)
  3. Creates one card per sensor type
  4. Updates cards in real-time as data arrives
  5. Shows both sample timestamp (from CSV) and received timestamp

Message Format

Sample data message (PSTAT measurement):

{
  "messageId": "550e8400-e29b-41d4-a716-446655440000",
  "sensorId": "max30123-spi",
  "sensorType": "generate",
  "sourceModelId": "max30123-spi",
  "deviceBus": "default",
  "timestamp": "2024-01-15T10:30:45.123Z",
  "value": {
    "measurement": 32.899302,
    "min": 0,
    "tag": 0,
    "units": "nA",
    "timestamp": 205.788818,
    "bytes": [0, 83, 152],
    "sampleValue": 21400,
    "type": "PSTAT"
  }
}

SPI transaction message:

{
  "messageId": "660e8400-e29b-41d4-a716-446655440001",
  "sensorId": "max30123-spi",
  "sensorType": "spiRead",
  "sourceModelId": "max30123-spi",
  "deviceBus": "default",
  "timestamp": "2024-01-15T10:30:45.456Z",
  "value": {
    "type": "SPI",
    "address": "09",
    "bytes": [0, 0],
    "timestamp": 1771615795710
  }
}

Step 5: Verify the Complete System

# 1. Check sensor service is running
systemctl status sensor-service
curl http://localhost:3000/health

# 2. Check metrics
curl http://localhost:3000/metrics | jq '.'

# 3. Discover available sensors
curl http://localhost:3000/topics | jq '.'

# 4. Subscribe to all sensors and watch data flow
curl -N "http://localhost:3000/subscribe?topic=sensors/#"

# 5. Open the dashboard
# Navigate to http://<device-ip>:3000/docs/sensor-dashboard.html
# or open docs/sensor-dashboard.html locally if port-forwarded

Expected output:

  • Health endpoint returns {"status": "healthy"}
  • Metrics show messagesReceived and messagesProcessed increasing
  • Topics endpoint lists all 13+ sensor types
  • Subscribe shows real-time SSE events with sensor data
  • Dashboard displays live updating cards for each sensor type

Production Deployment

Quick summary:

# 1. Package the service
./scripts/package.sh

# 2. Transfer to server
scp sensor-service-*.tar.gz user@server:/tmp/

# 3. Install on server
ssh user@server
cd /tmp && tar -xzf sensor-service-*.tar.gz
cd sensor-service-*
# Follow manual installation steps from "Step 2: Deploy to Generic Linux" above

Corellium Generic Linux Deployment

See the Complete Deployment Guide above for full step-by-step instructions including:

  • Packaging with package.sh
  • Deploying to Generic Linux via SCP
  • Installing Node.js and the service
  • Running the CoreModel peripheral binary with Viser data generator
  • Understanding message flow from CoreModel → NodeMQ → Dashboard
  • Topic structure and subscription examples

Configuration

Edit /opt/sensor-service/config.json (or config.json for local development):

{
  "heartbeatIntervalMs": 30000,
  "topicBufferSize": 2000,
  "persistencePath": "./data/messages",
  "parallelWorkers": 2, /* should match the number of virtual cores, 2 is default */
  "retryConfig": {
    "maxRetries": 5,
    "initialDelayMs": 100,
    "maxDelayMs": 30000,
    "backoffMultiplier": 2
  }
}

Configuration Options:

  • heartbeatIntervalMs - SSE heartbeat interval (default: 30000)
  • topicBufferSize - Max messages buffered per topic for replay (default: 2000)
  • persistencePath - Directory for message persistence (default: "./data/messages")
  • parallelWorkers - Number of worker threads for parallel processing (default: 4, range: 1-16)
  • retryConfig - Retry configuration for failed message processing

Parallel Processing:

The service uses worker threads for multi-threaded message processing. Set parallelWorkers to the number of CPU cores for optimal performance:

# Get CPU core count
node -e "console.log(require('os').cpus().length)"

Running Locally

# Development
npm run dev

# Production
npm start

# With PM2
pm2 start dist/index.js --name sensor-service

# With custom port
PORT=8080 npm start

Managing Service on Corellium

# Start service
systemctl start sensor-service

# Stop service
systemctl stop sensor-service

# Restart service
systemctl restart sensor-service

# View logs
journalctl -u sensor-service -f

API Endpoints

POST /ingest

Ingest sensor data into the service.

curl -X POST http://localhost:3000/ingest \
  -H "Content-Type: application/json" \
  -d '{
    "sensorId": "temp-sensor-1",
    "sensorType": "temperature",
    "value": 23.5,
    "sourceModelId": "device-001"
  }'

GET /subscribe

Subscribe to sensor data via SSE.

# Subscribe to all sensors
curl -N "http://localhost:3000/subscribe?topic=sensors/#"

# Subscribe to specific sensor type
curl -N "http://localhost:3000/subscribe?topic=sensors/+/+/spiRead/+"

GET /topics

Discover available sensors.

curl "http://localhost:3000/topics"

GET /metrics

Get service metrics.

curl "http://localhost:3000/metrics"

GET /health

Health check.

curl "http://localhost:3000/health"

See API-ENDPOINTS.md for complete API documentation.

Topic Structure

Topics follow this hierarchical format:

sensors/{modelId}/{deviceBus}/{sensorType}/{sensorId}[/{type}]

Example:

sensors/max30123-spi/default/spiRead/max30123-spi/SPI

Wildcards:

  • + - Matches exactly one level
  • # - Matches zero or more levels (must be last segment)

Examples:

  • sensors/# - All sensors
  • sensors/max30123-spi/# - All sensors from specific model
  • sensors/+/+/spiRead/+ - All SPI read operations
  • sensors/+/+/+/+/PSTAT - All PSTAT measurements

Configuration

Buffer Size

The topicBufferSize setting controls how many messages are buffered per topic for replay to new subscribers.

  • Default: 2000 messages per topic
  • Memory: ~400 bytes per message (~800 KB per topic at default)
  • Hot-reloadable: Yes (edit config.json and save)

To tune buffer size, edit topicBufferSize in config.json. Calculate memory usage: topicCount * topicBufferSize * 400 bytes.

Heartbeat Interval

SSE heartbeat interval in milliseconds (default: 30000 = 30 seconds).

Persistence Path

Directory for message persistence (default: ./data/messages).

Retry Configuration

Controls retry behavior for failed message processing:

  • maxRetries - Maximum retry attempts (default: 5)
  • initialDelayMs - First retry delay (default: 100ms)
  • maxDelayMs - Maximum retry delay (default: 30000ms)
  • backoffMultiplier - Exponential backoff multiplier (default: 2)

Architecture

POST /ingest
    │
    ▼
Ingestion FIFO (256 slots)
    │
    ▼
MessagePersistence (atomic file write)
    │
    ▼
Processing FIFO (128 slots)
    │
    ▼
TopicRegistry + SSEConnectionManager
    │
    ▼
SSE Clients (browser/curl)

Dashboard

Open docs/sensor-dashboard.html in a browser to view real-time sensor data.

Features:

  • Real-time sensor data cards
  • Topic subscription management
  • Sensor discovery
  • Auto-reconnect on disconnect

Open docs/sensor-metrics.html for service metrics visualization.

Deployment

Systemd Service (Corellium Generic Linux)

The service is automatically configured as a systemd service when using the deployment package:

# Service management
systemctl start sensor-service
systemctl stop sensor-service
systemctl restart sensor-service
systemctl status sensor-service

# Enable/disable auto-start on boot
systemctl enable sensor-service
systemctl disable sensor-service

# View logs
journalctl -u sensor-service -f
journalctl -u sensor-service -n 100

Manual Deployment

For manual deployment without the package script:

# Build
npm run build

# Copy files to target
scp -r dist/ config.json package.json root@<device-ip>:/opt/sensor-service/

# On target device
cd /opt/sensor-service
npm install --production
node dist/index.js

Docker Deployment (Future)

Docker support is planned for future releases.

Development

Project Structure

.
├── src/
│   ├── index.ts                    # Main entry point
│   ├── services/                   # Business logic services
│   │   ├── sensor-ingestion-service.ts
│   │   ├── sse-connection-manager.ts
│   │   ├── topic-manager.ts
│   │   ├── topic-registry.ts
│   │   ├── message-persistence.ts
│   │   ├── config-manager.ts
│   │   └── health-monitor.ts
│   ├── types/                      # TypeScript types and schemas
│   │   ├── sensor-data.ts
│   │   └── ingested-message.ts
│   └── utils/                      # Utility classes
│       ├── fifo.ts
│       └── topic-message-buffer.ts
├── docs/                           # Documentation
│   ├── API-ENDPOINTS.md
│   ├── SUBSCRIPTION-ARCHITECTURE.md
│   ├── sensor-dashboard.html
│   └── sensor-metrics.html
├── config.json                     # Service configuration
└── package.json

Build

npm run build

Lint

npm run lint

Test

npm test

Monitoring

Health Check

curl http://localhost:3000/health

Returns:

  • healthy (200) - All systems operational
  • degraded (200) - Some non-critical issues
  • unhealthy (503) - Service unavailable

Metrics

curl http://localhost:3000/metrics | jq '.'

Key metrics:

  • messagesReceived - Total messages ingested
  • messagesProcessed - Total messages successfully processed
  • messagesFailed - Total failed messages
  • activeConnections - Current SSE clients
  • averageLatencyMs - Average processing time
  • buffer.topicCount - Topics with buffered messages
  • buffer.totalMessages - Total buffered messages
  • fifo.ingestion.* - Ingestion queue stats
  • fifo.processing.* - Processing queue stats

Troubleshooting

Service Won't Start

# Check logs for errors
journalctl -u sensor-service -n 50

# Common issues:
# - Node.js not installed: apt-get install -y nodejs
# - Port 3000 in use: lsof -i :3000
# - Permission denied: chown -R root:root /opt/sensor-service

3-minute delay before data flows

Cause: Buffer is empty or has very few messages for the subscribed topic.

Solution: Increase topicBufferSize in config.json to 5000-10000 for low-frequency data.

# Edit config
vi /opt/sensor-service/config.json
# Change: "topicBufferSize": 5000

# Restart service
systemctl restart sensor-service

High memory usage

Cause: Buffer size is too large.

Solution: Reduce topicBufferSize in config.json. Calculate: topicCount * topicBufferSize * 400 bytes ≈ memory usage.

Messages not being delivered

Cause: FIFO queues are full or overflowing.

Solution: Check /metrics endpoint for fifo.*.overflowCount. If non-zero, the service is receiving data faster than it can process.

curl http://localhost:3000/metrics | jq '.fifo'

Cannot Connect from Browser

Cause: Firewall or network configuration blocking port 3000.

Solution:

# Check if service is listening
netstat -tlnp | grep 3000

# Allow port through firewall (if applicable)
ufw allow 3000/tcp

Integration with C Device Models

The service is designed to work with C device models that use the coremodel_telemetry library.

C library sends:

{
  "sensorId": "max30123-spi",
  "sensorType": "spiRead",
  "value": {
    "type": "SPI",
    "address": "09",
    "timestamp": 1771615795710,
    "bytes": ["00", "00"]
  },
  "sourceModelId": "max30123-spi"
}

NodeMQ routes to topic:

sensors/max30123-spi/default/spiRead/max30123-spi/SPI

Documentation

License

MIT License

Copyright (c) [year] [fullname]

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Contributing

jyamadaCore

Support

For questions or issues, contact the development team.

About

Custom service using built in message queues in Node to send and receive telemtry data.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages