Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions e2e/segments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,4 +352,50 @@ func TestSegmentDetectors(t *testing.T) {
segments[0].StartTime.Format(time.RFC3339), fromMidTrip1.Format(time.RFC3339))
assert.False(t, segments[1].StartedBeforeRange, "Trip 2 should have StartedBeforeRange=false")
})

// Excessive idling: insert engine speed (RPM) in idle range for a contiguous period
idleStart := baseTime.Add(48 * time.Hour)
idleDurationSec := 15 * 60 // 15 minutes
t.Run("StaticRpm", func(t *testing.T) {
idleSignals := generateIdleRpmSignals(idleStart, idleDurationSec)
insertTestSignals(t, conn, idleSignals)

fromIdle := idleStart.Add(-1 * time.Hour)
toIdle := idleStart.Add(time.Duration(idleDurationSec)*time.Second + 1*time.Hour)

detector := ch.NewStaticRpmDetector(conn)
segments, err := detector.DetectSegments(ctx, testTokenID, fromIdle, toIdle, nil)
require.NoError(t, err)

require.Len(t, segments, 1, "Expected 1 static RPM (idling) segment")
seg := segments[0]
assert.False(t, seg.IsOngoing)
assert.NotNil(t, seg.EndTime)
// minSegmentDurationSeconds default is 240 (4 min); we have 15 min of idle
assert.GreaterOrEqual(t, seg.DurationSeconds, int32(240))
t.Logf("Idling segment: %s - %v (duration: %ds)",
seg.StartTime.Format(time.RFC3339), seg.EndTime, seg.DurationSeconds)
})
}

// generateIdleRpmSignals creates powertrainCombustionEngineSpeed signals in idle range (e.g. 800 rpm)
// at 10s intervals for the given duration so 60s windows have enough samples and max(rpm) <= 1000.
func generateIdleRpmSignals(startTime time.Time, durationSeconds int) []testSignal {
const engineSpeedName = "powertrainCombustionEngineSpeed"
const idleRpm = 800.0
signals := []testSignal{}
for offset := 0; offset < durationSeconds; offset += 10 {
ts := startTime.Add(time.Duration(offset) * time.Second)
signals = append(signals, testSignal{
TokenID: testTokenID,
Timestamp: ts,
Name: engineSpeedName,
ValueNumber: idleRpm,
ValueString: "",
Source: testSource,
Producer: testProducer,
CloudEventID: fmt.Sprintf("idle-%s-%d", ts.Format("150405"), offset),
})
}
return signals
}
33 changes: 27 additions & 6 deletions internal/graph/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 12 additions & 6 deletions internal/graph/model/models_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 12 additions & 3 deletions internal/repositories/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ func validateSegmentArgs(tokenID int, from, to time.Time) error {
return nil
}

// validateSegmentConfig validates the segment configuration parameters
func validateSegmentConfig(config *model.SegmentConfig) error {
// validateSegmentConfig validates the segment configuration parameters.
// When mechanism is staticRpm, also validates idling-specific fields.
func validateSegmentConfig(config *model.SegmentConfig, mechanism model.DetectionMechanism) error {
if config == nil {
return nil
}
Expand All @@ -63,6 +64,14 @@ func validateSegmentConfig(config *model.SegmentConfig) error {
}
}

if mechanism == model.DetectionMechanismStaticRpm {
if config.MaxIdleRpm != nil {
if *config.MaxIdleRpm < 300 || *config.MaxIdleRpm > 3000 {
return fmt.Errorf("maxIdleRpm must be between 300 and 3000")
}
}
}

return nil
}

Expand All @@ -73,7 +82,7 @@ func (r *Repository) GetSegments(ctx context.Context, tokenID int, from, to time
return nil, errorhandler.NewBadRequestError(ctx, err)
}

if err := validateSegmentConfig(config); err != nil {
if err := validateSegmentConfig(config, mechanism); err != nil {
return nil, errorhandler.NewBadRequestError(ctx, err)
}

Expand Down
32 changes: 32 additions & 0 deletions internal/repositories/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,35 @@ func TestValidateSegmentArgs(t *testing.T) {
require.Error(t, err)
})
}

func TestValidateSegmentConfig(t *testing.T) {
validConfig := &model.SegmentConfig{}
otherMechanism := model.DetectionMechanismIgnitionDetection
idlingMechanism := model.DetectionMechanismStaticRpm

t.Run("nil config", func(t *testing.T) {
require.NoError(t, validateSegmentConfig(nil, otherMechanism))
require.NoError(t, validateSegmentConfig(nil, idlingMechanism))
})

t.Run("valid config other mechanism", func(t *testing.T) {
require.NoError(t, validateSegmentConfig(validConfig, otherMechanism))
})

t.Run("valid config staticRpm with idling fields", func(t *testing.T) {
cfg := &model.SegmentConfig{
MaxIdleRpm: ptr(1000),
SignalCountThreshold: ptr(5),
}
require.NoError(t, validateSegmentConfig(cfg, idlingMechanism))
})

t.Run("staticRpm maxIdleRpm out of range", func(t *testing.T) {
cfg := &model.SegmentConfig{MaxIdleRpm: ptr(100)}
require.Error(t, validateSegmentConfig(cfg, idlingMechanism))
cfg.MaxIdleRpm = ptr(4000)
require.Error(t, validateSegmentConfig(cfg, idlingMechanism))
})
}

func ptr(i int) *int { return &i }
2 changes: 2 additions & 0 deletions internal/service/ch/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func (s *Service) GetSegments(
detector = &FrequencyDetector{conn: s.conn}
case model.DetectionMechanismChangePointDetection:
detector = &ChangePointDetector{conn: s.conn}
case model.DetectionMechanismStaticRpm:
detector = &StaticRpmDetector{conn: s.conn}
default:
return nil, fmt.Errorf("unknown detection mechanism: %s", mechanism)
}
Expand Down
132 changes: 132 additions & 0 deletions internal/service/ch/static_rpm_detector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package ch

import (
"context"
"fmt"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/DIMO-Network/telemetry-api/internal/graph/model"
)

const (
defaultIdleWindowSizeSeconds = 60 // 1 minute windows (same as frequency detector)
defaultMaxIdleRpm = 1500
engineSpeedSignalName = "powertrainCombustionEngineSpeed" // fixed; not configurable
defaultSignalCountThresholdIdle = 2 // powertrainCombustionEngineSpeed ~2/min; min samples per 1min window
defaultMinIdleRpmForEngineRunning = 0 // min_rpm > this to exclude engine-off
)

// StaticRpmDetector detects segments where engine RPM remains in idle range (static/low RPM).
// Uses repeated windows of idle RPM merged like trips.
type StaticRpmDetector struct {
conn clickhouse.Conn
}

// NewStaticRpmDetector creates a new StaticRpmDetector with the given connection.
func NewStaticRpmDetector(conn clickhouse.Conn) *StaticRpmDetector {
return &StaticRpmDetector{conn: conn}
}

// DetectSegments implements idle-RPM-based segment detection.
func (d *StaticRpmDetector) DetectSegments(
ctx context.Context,
tokenID uint32,
from, to time.Time,
config *model.SegmentConfig,
) ([]*Segment, error) {
maxGap := defaultMinIdleSeconds
minDuration := defaultMinSegmentDurationSeconds
maxIdleRpm := defaultMaxIdleRpm
signalThreshold := defaultSignalCountThresholdIdle

if config != nil {
if config.MinIdleSeconds != nil {
maxGap = *config.MinIdleSeconds
}
if config.MinSegmentDurationSeconds != nil {
minDuration = *config.MinSegmentDurationSeconds
}
if config.MaxIdleRpm != nil {
maxIdleRpm = *config.MaxIdleRpm
}
if config.SignalCountThreshold != nil {
signalThreshold = *config.SignalCountThreshold
}
}

windowSize := defaultIdleWindowSizeSeconds
lookbackFrom := from.Add(-time.Duration(maxGap) * time.Second)
windows, err := d.getIdleWindows(ctx, tokenID, lookbackFrom, to, windowSize, maxIdleRpm, signalThreshold)
if err != nil {
return nil, fmt.Errorf("failed to get idle windows: %w", err)
}

if len(windows) == 0 {
return nil, nil
}

segments := mergeWindowsIntoSegments(tokenID, windows, from, to, maxGap, minDuration)
return segments, nil
}

// getIdleWindows returns time windows where engine speed is in idle band (0 < rpm <= maxIdleRpm).
// Uses signal FINAL; groups by window and keeps windows with sample_count >= signalThreshold and max(rpm) <= maxIdleRpm and min(rpm) > 0.
func (d *StaticRpmDetector) getIdleWindows(
ctx context.Context,
tokenID uint32,
from, to time.Time,
windowSizeSeconds int,
maxIdleRpm int,
signalThreshold int,
) ([]ActiveWindow, error) {
query := `
SELECT
toStartOfInterval(timestamp, INTERVAL ? second) AS window_start,
toStartOfInterval(timestamp, INTERVAL ? second) + INTERVAL ? second AS window_end,
count() AS signal_count,
uniq(name) AS distinct_signal_count
FROM signal FINAL
PREWHERE token_id = ?
WHERE name = ?
AND timestamp >= ?
AND timestamp < ?
GROUP BY window_start
HAVING signal_count >= ? AND max(value_number) <= ? AND min(value_number) > ?
ORDER BY window_start`

rows, err := d.conn.Query(ctx, query,
windowSizeSeconds, windowSizeSeconds, windowSizeSeconds,
tokenID, engineSpeedSignalName, from, to,
signalThreshold, maxIdleRpm, defaultMinIdleRpmForEngineRunning)
if err != nil {
return nil, fmt.Errorf("failed querying idle windows: %w", err)
}
defer func() { _ = rows.Close() }()

expectedWindows := int(to.Sub(from).Seconds()) / windowSizeSeconds
if expectedWindows <= 0 {
expectedWindows = 1
}
windows := make([]ActiveWindow, 0, expectedWindows)

for rows.Next() {
var w ActiveWindow
err := rows.Scan(&w.WindowStart, &w.WindowEnd, &w.SignalCount, &w.DistinctSignalCount)
if err != nil {
return nil, fmt.Errorf("failed scanning idle window row: %w", err)
}
windows = append(windows, w)
}

if rows.Err() != nil {
return nil, fmt.Errorf("idle window row error: %w", rows.Err())
}

return windows, nil
}

// GetMechanismName returns the name of this detection mechanism.
func (d *StaticRpmDetector) GetMechanismName() string {
return "staticRpm"
}
Loading
Loading